Skip to content

Commit

Permalink
add disk metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
adithyachakilam committed May 28, 2024
1 parent ee0bada commit 47cca7f
Show file tree
Hide file tree
Showing 16 changed files with 633 additions and 29 deletions.
8 changes: 7 additions & 1 deletion docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,14 @@ These metrics are available on operating systems with the cgroup kernel feature.
|------|-----------|----------|------------|
|`cgroup/cpu/shares`|Relative value of CPU time available to this process. Read from `cpu.shares`.||Varies|
|`cgroup/cpu/cores_quota`|Number of cores available to this process. Derived from `cpu.cfs_quota_us`/`cpu.cfs_period_us`.||Varies. A value of -1 indicates there is no explicit quota set.|
|`cgroup/cpu/usage/percentage`|Total cpu percentage used by cgroup of process that is running||0-100|
|`cgroup/cpu/usage/total/percentage`|Total cpu percentage used by cgroup of process that is running||0-100|
|`cgroup/cpu/usage/user/percentage`|User cpu percentage used by cgroup of process that is running||0-100|
|`cgroup/cpu/usage/sys/percentage`|Sys cpu percentage used by cgroup of process that is running||0-100|
|`cgroup/cpuacct/usage/timeNs`|Reports the total CPU time (in nanoseconds) consumed by all tasks in this process||Varies|
|`cgroup/disk/read/size`|Reports the number of bytes transferred to specific devices by a cgroup of process that is running.|`diskName`|Varies|
|`cgroup/disk/write/size`|Reports the number of bytes transferred from specific devices by a cgroup of process that is running.|`diskName`|Varies|
|`cgroup/disk/read/count`|Reports the number of read operations performed on specific devices by a cgroup of process that is running.|`diskName`|Varies|
|`cgroup/disk/write/count`|Reports the number of write operations performed on specific devices by a cgroup of process that is running.|`diskName`|Varies|
|`cgroup/memory/*`|Memory stats for this process, such as `cache` and `total_swap`. Each stat produces a separate metric. Read from `memory.stat`.||Varies|
|`cgroup/memory_numa/*/pages`|Memory stats, per NUMA node, for this process, such as `total` and `unevictable`. Each stat produces a separate metric. Read from `memory.num_stat`.|`numaZone`|Varies|
|`cgroup/memory/limit/bytes`|Reports the maximum memory that can be used by processes in the cgroup (in bytes)||Varies|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,52 @@
package org.apache.druid.java.util.metrics;

import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.Cpu;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.time.Instant;
import java.util.Map;

public class CgroupCpuMonitor extends FeedDefiningMonitor
{
private static final Logger LOG = new Logger(CgroupCpuMonitor.class);
private static final Long DEFAULT_USER_HZ = 100L;
public static final String TOTAL_USAGE_METRIC = "cgroup/cpu/usage/total/percentage";
public static final String USER_USAGE_METRIC = "cgroup/cpu/usage/user/percentage";
public static final String SYS_USAGE_METRIC = "cgroup/cpu/usage/sys/percentage";
private static final String TOTAL = "total";
private static final String USER = "user";
private static final String SYSTEM = "system";
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
private long previousUsage;
private long previousSnapshotAt = 0L;
private Long userHz;
private KeyedDiff jiffies = new KeyedDiff();
private long prevJiffiesSnapshotAt = 0;

public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
this.dimensions = dimensions;
try {
Process process = Runtime.getRuntime().exec("getconf CLK_TCK");
userHz = Long.parseLong(new BufferedReader(new InputStreamReader(process.getInputStream())).readLine());
}
catch (IOException | NumberFormatException e) {
LOG.warn(e, "Error getting the USER_HZ value");
}
finally {
if (userHz == null) {
LOG.warn("Using default value for USER_HZ");
userHz = DEFAULT_USER_HZ;
}
}
}

public CgroupCpuMonitor(final Map<String, String[]> dimensions, String feed)
Expand All @@ -62,7 +88,7 @@ public boolean doMonitor(ServiceEmitter emitter)
{
final Cpu cpu = new Cpu(cgroupDiscoverer);
final Cpu.CpuMetrics cpuSnapshot = cpu.snapshot();
long now = Instant.now().getNano();
long now = Instant.now().getEpochSecond();

final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
Expand All @@ -71,16 +97,27 @@ public boolean doMonitor(ServiceEmitter emitter)
"cgroup/cpu/cores_quota",
computeProcessorQuota(cpuSnapshot.getQuotaUs(), cpuSnapshot.getPeriodUs())
));
emitter.emit(builder.setMetric("cgroup/cpuacct/usage/timeNs", cpuSnapshot.getUsageNs()));

if (previousSnapshotAt > 0) {
long currentUsage = cpu.snapshot().getUsageNs();
double usagePercentage = 100.0 * (currentUsage - previousUsage) / (now - previousSnapshotAt);
emitter.emit(builder.setMetric("cgroup/cpu/usage/percentage", usagePercentage));
previousUsage = currentUsage;
long elapsedJiffiesSnapshotSecs = now - prevJiffiesSnapshotAt;
if (elapsedJiffiesSnapshotSecs > 0) {
prevJiffiesSnapshotAt = now;
final Map<String, Long> elapsedJiffies = jiffies.to(
"usage",
ImmutableMap.<String, Long>builder()
.put(USER, cpuSnapshot.getUserJiffies())
.put(SYSTEM, cpuSnapshot.getSystemJiffies())
.put(TOTAL, cpuSnapshot.getTotalJiffies())
.build()
);
if (elapsedJiffies != null) {
double totalUsagePct = 100.0 * elapsedJiffies.get(TOTAL) / userHz / elapsedJiffiesSnapshotSecs;
double sysUsagePct = 100.0 * elapsedJiffies.get(SYSTEM) / userHz / elapsedJiffiesSnapshotSecs;
double userUsagePct = 100.0 * elapsedJiffies.get(USER) / userHz / elapsedJiffiesSnapshotSecs;
emitter.emit(builder.setMetric(TOTAL_USAGE_METRIC, totalUsagePct));
emitter.emit(builder.setMetric(SYS_USAGE_METRIC, sysUsagePct));
emitter.emit(builder.setMetric(USER_USAGE_METRIC, userUsagePct));
}
}

previousSnapshotAt = now;
return true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.metrics;

import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.Disk;

import java.util.Map;

public class CgroupDiskMonitor extends FeedDefiningMonitor
{
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
private final KeyedDiff diff = new KeyedDiff();

public CgroupDiskMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
this.dimensions = dimensions;
}

@Override
public boolean doMonitor(ServiceEmitter emitter)
{
Map<String, Disk.Metrics> snapshot = new Disk(cgroupDiscoverer).snapshot();
for (String disk : snapshot.keySet()) {
Disk.Metrics metrics = snapshot.get(disk);
final Map<String, Long> stats = diff.to(
metrics.getDiskName(),
ImmutableMap.<String, Long>builder()
.put("cgroup/disk/read/bytes", metrics.getReadBytes())
.put("cgroup/disk/read/count", metrics.getReadCount())
.put("cgroup/disk/write/bytes", metrics.getWriteBytes())
.put("cgroup/disk/write/count", metrics.getWriteCount())
.build()
);

if (stats != null) {
final ServiceMetricEvent.Builder builder = builder()
.setDimension("diskName", metrics.getDiskName());
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
for (Map.Entry<String, Long> entry : stats.entrySet()) {
emitter.emit(builder.setMetric(entry.getKey(), entry.getValue()));
}
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,24 @@

package org.apache.druid.java.util.metrics.cgroups;

import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.metrics.CgroupUtil;

import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.regex.Pattern;

/**
* Collect CPU share and quota information from cpu cgroup files.
*/
public class Cpu
{
private static final Logger LOG = new Logger(Cpu.class);
private static final String CGROUP = "cpu";
private static final String CPUACCT_USAGE_FILE = "cpuacct.usage";
private static final String CPUACCT_STAT_FILE = "cpuacct.stat";
private static final String CPU_SHARES_FILE = "cpu.shares";
private static final String CPU_QUOTA_FILE = "cpu.cfs_quota_us";
private static final String CPU_PERIOD_FILE = "cpu.cfs_period_us";
Expand All @@ -46,11 +55,37 @@ public Cpu(CgroupDiscoverer cgroupDiscoverer)
*/
public CpuMetrics snapshot()
{
long userJiffies = -1L;
long systemJiffies = -1L;
try (final BufferedReader reader = Files.newBufferedReader(
Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), CPUACCT_STAT_FILE)
)) {
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
final String[] parts = line.split(Pattern.quote(" "));
if (parts.length != 2) {
// ignore
continue;
}
switch (parts[0]) {
case "user":
userJiffies = Longs.tryParse(parts[1]);
break;
case "system":
systemJiffies = Longs.tryParse(parts[1]);
break;
}
}
}
catch (IOException | RuntimeException ex) {
LOG.error(ex, "Unable to fetch memory snapshot");
}


return new CpuMetrics(
CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_SHARES_FILE, -1),
CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_QUOTA_FILE, 0),
CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_PERIOD_FILE, 0),
CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPUACCT_USAGE_FILE, -1)
systemJiffies, userJiffies
);
}

Expand All @@ -67,15 +102,19 @@ public static class CpuMetrics
// bandwidth decisions
private final long periodUs;

// Maps to cpuacct.uage - the total CPU time (in nanoseconds) consumed by all tasks in this cgroup
private final long usageNs;
// Maps to user value at cpuacct.stat
private final long userJiffies;

// Maps to system value at cpuacct.stat
private final long systemJiffies;

CpuMetrics(long shares, long quotaUs, long periodUs, long usageNs)
CpuMetrics(long shares, long quotaUs, long periodUs, long systemJiffis, long userJiffies)
{
this.shares = shares;
this.quotaUs = quotaUs;
this.periodUs = periodUs;
this.usageNs = usageNs;
this.userJiffies = userJiffies;
this.systemJiffies = systemJiffis;
}

public final long getShares()
Expand All @@ -93,9 +132,19 @@ public final long getPeriodUs()
return periodUs;
}

public final long getUsageNs()
public long getUserJiffies()
{
return userJiffies;
}

public long getSystemJiffies()
{
return systemJiffies;
}

public long getTotalJiffies()
{
return usageNs;
return userJiffies + systemJiffies;
}
}
}
Loading

0 comments on commit 47cca7f

Please sign in to comment.