Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cgroup cpu/mem/disk usage metrics #16472

Merged
merged 15 commits into from
May 29, 2024
4 changes: 4 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,12 @@ These metrics are available on operating systems with the cgroup kernel feature.
|------|-----------|----------|------------|
|`cgroup/cpu/shares`|Relative value of CPU time available to this process. Read from `cpu.shares`.||Varies|
|`cgroup/cpu/cores_quota`|Number of cores available to this process. Derived from `cpu.cfs_quota_us`/`cpu.cfs_period_us`.||Varies. A value of -1 indicates there is no explicit quota set.|
|`cgroup/cpu/usage/percentage`|Total cpu percentage used by cgroup of process that is running||0-100|
adithyachakilam marked this conversation as resolved.
Show resolved Hide resolved
|`cgroup/cpuacct/usage/timeNs`|Reports the total CPU time (in nanoseconds) consumed by all tasks in this process||Varies|
|`cgroup/memory/*`|Memory stats for this process, such as `cache` and `total_swap`. Each stat produces a separate metric. Read from `memory.stat`.||Varies|
|`cgroup/memory_numa/*/pages`|Memory stats, per NUMA node, for this process, such as `total` and `unevictable`. Each stat produces a separate metric. Read from `memory.num_stat`.|`numaZone`|Varies|
|`cgroup/memory/usage/bytes`|Reports the maximum memory used by processes in the cgroup (in bytes)||Varies|
|`cgroup/memory/usage/bytes`|Reports the maximum amount of user memory (including file cache)||Varies|
|`cgroup/cpuset/cpu_count`|Total number of CPUs available to the process. Derived from `cpuset.cpus`.||Varies|
|`cgroup/cpuset/effective_cpu_count`|Total number of active CPUs available to the process. Derived from `cpuset.effective_cpus`.||Varies|
|`cgroup/cpuset/mems_count`|Total number of memory nodes available to the process. Derived from `cpuset.mems`.||Varies|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.Cpu;

import java.time.Instant;
import java.util.Map;

public class CgroupCpuMonitor extends FeedDefiningMonitor
{
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
private long previousUsage;
private long previousSnapshotAt = 0L;

public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
{
Expand Down Expand Up @@ -58,7 +61,8 @@ public CgroupCpuMonitor()
public boolean doMonitor(ServiceEmitter emitter)
{
final Cpu cpu = new Cpu(cgroupDiscoverer);
final Cpu.CpuAllocationMetric cpuSnapshot = cpu.snapshot();
final Cpu.CpuMetrics cpuSnapshot = cpu.snapshot();
long now = Instant.now().getNano();

final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
Expand All @@ -67,7 +71,16 @@ public boolean doMonitor(ServiceEmitter emitter)
"cgroup/cpu/cores_quota",
computeProcessorQuota(cpuSnapshot.getQuotaUs(), cpuSnapshot.getPeriodUs())
));
emitter.emit(builder.setMetric("cgroup/cpuacct/usage/timeNs", cpuSnapshot.getUsageNs()));

if (previousSnapshotAt > 0) {
long currentUsage = cpu.snapshot().getUsageNs();
double usagePercentage = 100.0 * (currentUsage - previousUsage) / (now - previousSnapshotAt);
emitter.emit(builder.setMetric("cgroup/cpu/usage/percentage", usagePercentage));
previousUsage = currentUsage;
}

previousSnapshotAt = now;
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,18 @@ public boolean doMonitor(ServiceEmitter emitter)
{
final Memory memory = new Memory(cgroupDiscoverer);
final Memory.MemoryStat stat = memory.snapshot();
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
emitter.emit(builder.setMetric("cgroup/memory/usage/bytes", stat.getUsage()));
emitter.emit(builder.setMetric("cgroup/memory/limit/bytes", stat.getLimit()));

stat.getMemoryStats().forEach((key, value) -> {
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
// See https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
// There are inconsistent units for these. Most are bytes.
emitter.emit(builder.setMetric(StringUtils.format("cgroup/memory/%s", key), value));
});
stat.getNumaMemoryStats().forEach((key, value) -> {
final ServiceMetricEvent.Builder builder = builder().setDimension("numaZone", Long.toString(key));
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder().setDimension("numaZone", Long.toString(key));
value.forEach((k, v) -> emitter.emit(builder.setMetric(StringUtils.format("cgroup/memory_numa/%s/pages", k), v)));
});
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,32 @@

package org.apache.druid.java.util.metrics;

import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;

public class CgroupUtil
{
private static final Logger LOG = new Logger(CgroupUtil.class);
public static final String SPACE_MATCH = Pattern.quote(" ");
public static final String COMMA_MATCH = Pattern.quote(",");

public static long readLongValue(CgroupDiscoverer discoverer, String cgroup, String fileName, long defaultValue)
{
try {
List<String> lines = Files.readAllLines(Paths.get(discoverer.discover(cgroup).toString(), fileName));
return lines.stream().map(Longs::tryParse).filter(Objects::nonNull).findFirst().orElse(defaultValue);
}
catch (RuntimeException | IOException ex) {
LOG.warn(ex, "Unable to fetch %s", fileName);
return defaultValue;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,15 @@

package org.apache.druid.java.util.metrics.cgroups;

import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.logger.Logger;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import org.apache.druid.java.util.metrics.CgroupUtil;

/**
* Collect CPU share and quota information from cpu cgroup files.
*/
public class Cpu
{
private static final Logger LOG = new Logger(Cpu.class);
private static final String CGROUP = "cpu";
private static final String CPUACCT_USAGE_FILE = "cpuacct.usage";
private static final String CPU_SHARES_FILE = "cpu.shares";
private static final String CPU_QUOTA_FILE = "cpu.cfs_quota_us";
private static final String CPU_PERIOD_FILE = "cpu.cfs_period_us";
Expand All @@ -51,28 +44,17 @@ public Cpu(CgroupDiscoverer cgroupDiscoverer)
*
* @return A snapshot with the data populated.
*/
public CpuAllocationMetric snapshot()
public CpuMetrics snapshot()
{
return new CpuAllocationMetric(
readLongValue(CPU_SHARES_FILE, -1),
readLongValue(CPU_QUOTA_FILE, 0),
readLongValue(CPU_PERIOD_FILE, 0)
return new CpuMetrics(
CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_SHARES_FILE, -1),
CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_QUOTA_FILE, 0),
CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_PERIOD_FILE, 0),
CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPUACCT_USAGE_FILE, -1)
);
}

private long readLongValue(String fileName, long defaultValeue)
{
try {
List<String> lines = Files.readAllLines(Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), fileName));
return lines.stream().map(Longs::tryParse).filter(Objects::nonNull).findFirst().orElse(defaultValeue);
}
catch (RuntimeException | IOException ex) {
LOG.error(ex, "Unable to fetch %s", fileName);
return defaultValeue;
}
}

public static class CpuAllocationMetric
public static class CpuMetrics
{
// Maps to cpu.shares - the share of CPU given to the process
private final long shares;
Expand All @@ -85,11 +67,15 @@ public static class CpuAllocationMetric
// bandwidth decisions
private final long periodUs;

CpuAllocationMetric(long shares, long quotaUs, long periodUs)
// Maps to cpuacct.uage - the total CPU time (in nanoseconds) consumed by all tasks in this cgroup
private final long usageNs;

CpuMetrics(long shares, long quotaUs, long periodUs, long usageNs)
{
this.shares = shares;
this.quotaUs = quotaUs;
this.periodUs = periodUs;
this.usageNs = usageNs;
}

public final long getShares()
Expand All @@ -106,5 +92,10 @@ public final long getPeriodUs()
{
return periodUs;
}

public final long getUsageNs()
{
return usageNs;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.metrics.CgroupUtil;

import java.io.BufferedReader;
import java.io.IOException;
Expand All @@ -36,6 +37,8 @@ public class Memory
private static final Logger LOG = new Logger(Memory.class);
private static final String CGROUP = "memory";
private static final String CGROUP_MEMORY_FILE = "memory.stat";
private static final String MEMORY_USAGE_FILE = "memory.usage_in_bytes";
private static final String MEMORY_LIMIT_FILE = "memory.limit_in_bytes";
private static final String CGROUP_MEMORY_NUMA_FILE = "memory.numa_stat";
private final CgroupDiscoverer cgroupDiscoverer;

Expand All @@ -47,6 +50,8 @@ public Memory(CgroupDiscoverer cgroupDiscoverer)
public MemoryStat snapshot()
{
final MemoryStat memoryStat = new MemoryStat();
memoryStat.usage = CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, MEMORY_USAGE_FILE, -1);
memoryStat.limit = CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, MEMORY_LIMIT_FILE, -1);

try (final BufferedReader reader = Files.newBufferedReader(
Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), CGROUP_MEMORY_FILE)
Expand Down Expand Up @@ -102,6 +107,8 @@ public static class MemoryStat
{
private final Map<String, Long> memoryStats = new HashMap<>();
private final Map<Long, Map<String, Long>> numaMemoryStats = new HashMap<>();
private long usage;
private long limit;

public Map<String, Long> getMemoryStats()
{
Expand All @@ -113,5 +120,15 @@ public Map<Long, Map<String, Long>> getNumaMemoryStats()
// They can modify the inner map... but why?
return ImmutableMap.copyOf(numaMemoryStats);
}

public long getUsage()
{
return usage;
}

public long getLimit()
{
return limit;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void setUp() throws IOException
TestUtils.copyOrReplaceResource("/cpu.shares", new File(cpuDir, "cpu.shares"));
TestUtils.copyOrReplaceResource("/cpu.cfs_quota_us", new File(cpuDir, "cpu.cfs_quota_us"));
TestUtils.copyOrReplaceResource("/cpu.cfs_period_us", new File(cpuDir, "cpu.cfs_period_us"));
TestUtils.copyOrReplaceResource("/cpuacct.usage", new File(cpuDir, "cpuacct.usage"));
}

@Test
Expand All @@ -72,13 +73,16 @@ public void testMonitor()
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
final List<Event> actualEvents = emitter.getEvents();
Assert.assertEquals(2, actualEvents.size());
Assert.assertEquals(3, actualEvents.size());
final Map<String, Object> sharesEvent = actualEvents.get(0).toMap();
final Map<String, Object> coresEvent = actualEvents.get(1).toMap();
final Map<String, Object> usageEvent = actualEvents.get(2).toMap();
Assert.assertEquals("cgroup/cpu/shares", sharesEvent.get("metric"));
Assert.assertEquals(1024L, sharesEvent.get("value"));
Assert.assertEquals("cgroup/cpu/cores_quota", coresEvent.get("metric"));
Assert.assertEquals(3.0D, coresEvent.get("value"));
Assert.assertEquals("cgroup/cpuacct/usage/timeNs", usageEvent.get("metric"));
Assert.assertEquals(5000000L, usageEvent.get("value"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public void setUp() throws IOException
FileUtils.mkdirp(memoryDir);
TestUtils.copyResource("/memory.stat", new File(memoryDir, "memory.stat"));
TestUtils.copyResource("/memory.numa_stat", new File(memoryDir, "memory.numa_stat"));
TestUtils.copyResource("/memory.usage_in_bytes", new File(memoryDir, "memory.usage_in_bytes"));
TestUtils.copyResource("/memory.limit_in_bytes", new File(memoryDir, "memory.limit_in_bytes"));
}

@Test
Expand All @@ -70,6 +72,6 @@ public void testMonitor()
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
final List<Event> actualEvents = emitter.getEvents();
Assert.assertEquals(44, actualEvents.size());
Assert.assertEquals(46, actualEvents.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void setUp() throws IOException
TestUtils.copyOrReplaceResource("/cpu.shares", new File(cpuDir, "cpu.shares"));
TestUtils.copyOrReplaceResource("/cpu.cfs_quota_us", new File(cpuDir, "cpu.cfs_quota_us"));
TestUtils.copyOrReplaceResource("/cpu.cfs_period_us", new File(cpuDir, "cpu.cfs_period_us"));
TestUtils.copyOrReplaceResource("/cpuacct.usage", new File(cpuDir, "cpuacct.usage"));
}

@Test
Expand All @@ -60,19 +61,21 @@ public void testWontCrash()
final Cpu cpu = new Cpu(cgroup -> {
throw new RuntimeException("Should still continue");
});
final Cpu.CpuAllocationMetric metric = cpu.snapshot();
final Cpu.CpuMetrics metric = cpu.snapshot();
Assert.assertEquals(-1L, metric.getShares());
Assert.assertEquals(0, metric.getQuotaUs());
Assert.assertEquals(0, metric.getPeriodUs());
Assert.assertEquals(-1, metric.getUsageNs());
}

@Test
public void testSimpleLoad()
{
final Cpu cpu = new Cpu(discoverer);
final Cpu.CpuAllocationMetric snapshot = cpu.snapshot();
final Cpu.CpuMetrics snapshot = cpu.snapshot();
Assert.assertEquals(1024, snapshot.getShares());
Assert.assertEquals(300000, snapshot.getQuotaUs());
Assert.assertEquals(100000, snapshot.getPeriodUs());
Assert.assertEquals(5000000, snapshot.getUsageNs());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public void setUp() throws Exception
FileUtils.mkdirp(memoryDir);
TestUtils.copyResource("/memory.stat", new File(memoryDir, "memory.stat"));
TestUtils.copyResource("/memory.numa_stat", new File(memoryDir, "memory.numa_stat"));
TestUtils.copyResource("/memory.usage_in_bytes", new File(memoryDir, "memory.usage_in_bytes"));
TestUtils.copyResource("/memory.limit_in_bytes", new File(memoryDir, "memory.limit_in_bytes"));
}

@Test
Expand All @@ -75,6 +77,10 @@ public void testSimpleSnapshot()
{
final Memory memory = new Memory(discoverer);
final Memory.MemoryStat stat = memory.snapshot();

Assert.assertEquals(5000000, stat.getUsage());
Assert.assertEquals(8000000, stat.getLimit());

final Map<String, Long> expectedMemoryStats = new HashMap<>();
expectedMemoryStats.put("inactive_anon", 0L);
expectedMemoryStats.put("total_pgfault", 13137L);
Expand Down
1 change: 1 addition & 0 deletions processing/src/test/resources/cpuacct.usage
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
5000000
1 change: 1 addition & 0 deletions processing/src/test/resources/memory.limit_in_bytes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
8000000
1 change: 1 addition & 0 deletions processing/src/test/resources/memory.usage_in_bytes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
5000000