diff --git a/docs/configuration/index.md b/docs/configuration/index.md index c618e196dbca..e1306acf8714 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -394,6 +394,7 @@ Metric monitoring is an essential part of Druid operations. The following monito |`org.apache.druid.java.util.metrics.JvmThreadsMonitor`|Reports Thread statistics in the JVM, like numbers of total, daemon, started, died threads.| |`org.apache.druid.java.util.metrics.CgroupCpuMonitor`|Reports CPU shares and quotas as per the `cpu` cgroup.| |`org.apache.druid.java.util.metrics.CgroupCpuSetMonitor`|Reports CPU core/HT and memory node allocations as per the `cpuset` cgroup.| +|`org.apache.druid.java.util.metrics.CgroupDiskMonitor`|Reports disk statistic as per the blkio cgroup.| |`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory statistic as per the memory cgroup.| |`org.apache.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how many events have been queued in the EventReceiverFirehose.| |`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical services. Available only on Historical services.| diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 51041952cfae..bf241ac5708a 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -516,8 +516,17 @@ These metrics are available on operating systems with the cgroup kernel feature. |------|-----------|----------|------------| |`cgroup/cpu/shares`|Relative value of CPU time available to this process. Read from `cpu.shares`.||Varies| |`cgroup/cpu/cores_quota`|Number of cores available to this process. Derived from `cpu.cfs_quota_us`/`cpu.cfs_period_us`.||Varies. A value of -1 indicates there is no explicit quota set.| +|`cgroup/cpu/usage/total/percentage`|Total cpu percentage used by cgroup of process that is running||0-100| +|`cgroup/cpu/usage/user/percentage`|User cpu percentage used by cgroup of process that is running||0-100| +|`cgroup/cpu/usage/sys/percentage`|Sys cpu percentage used by cgroup of process that is running||0-100| +|`cgroup/disk/read/size`|Reports the number of bytes transferred to specific devices by a cgroup of process that is running.|`diskName`|Varies| +|`cgroup/disk/write/size`|Reports the number of bytes transferred from specific devices by a cgroup of process that is running.|`diskName`|Varies| +|`cgroup/disk/read/count`|Reports the number of read operations performed on specific devices by a cgroup of process that is running.|`diskName`|Varies| +|`cgroup/disk/write/count`|Reports the number of write operations performed on specific devices by a cgroup of process that is running.|`diskName`|Varies| |`cgroup/memory/*`|Memory stats for this process, such as `cache` and `total_swap`. Each stat produces a separate metric. Read from `memory.stat`.||Varies| |`cgroup/memory_numa/*/pages`|Memory stats, per NUMA node, for this process, such as `total` and `unevictable`. Each stat produces a separate metric. Read from `memory.num_stat`.|`numaZone`|Varies| +|`cgroup/memory/limit/bytes`|Reports the maximum memory that can be used by processes in the cgroup (in bytes)||Varies| +|`cgroup/memory/usage/bytes`|Reports the maximum amount of user memory (including file cache)||Varies| |`cgroup/cpuset/cpu_count`|Total number of CPUs available to the process. Derived from `cpuset.cpus`.||Varies| |`cgroup/cpuset/effective_cpu_count`|Total number of active CPUs available to the process. Derived from `cpuset.effective_cpus`.||Varies| |`cgroup/cpuset/mems_count`|Total number of memory nodes available to the process. Derived from `cpuset.mems`.||Varies| diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java index af2408a156ed..9486bc532822 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java @@ -20,28 +20,64 @@ package org.apache.druid.java.util.metrics; import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.Cpu; +import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Map; public class CgroupCpuMonitor extends FeedDefiningMonitor { + private static final Logger LOG = new Logger(CgroupCpuMonitor.class); + private static final Long DEFAULT_USER_HZ = 100L; + public static final String TOTAL_USAGE_METRIC = "cgroup/cpu/usage/total/percentage"; + public static final String USER_USAGE_METRIC = "cgroup/cpu/usage/user/percentage"; + public static final String SYS_USAGE_METRIC = "cgroup/cpu/usage/sys/percentage"; + private static final String TOTAL = "total"; + private static final String USER = "user"; + private static final String SYSTEM = "system"; final CgroupDiscoverer cgroupDiscoverer; final Map dimensions; + private Long userHz; + private KeyedDiff jiffies = new KeyedDiff(); + private long prevJiffiesSnapshotAt = 0; public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) { super(feed); this.cgroupDiscoverer = cgroupDiscoverer; this.dimensions = dimensions; + try { + Process p = new ProcessBuilder("getconf", "CLK_TCK").start(); + try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream(), StandardCharsets.UTF_8))) { + String line = in.readLine(); + if (line != null) { + userHz = Long.valueOf(line.trim()); + } + } + } + catch (IOException | NumberFormatException e) { + LOG.warn(e, "Error getting the USER_HZ value"); + } + finally { + if (userHz == null) { + LOG.warn("Using default value for USER_HZ"); + userHz = DEFAULT_USER_HZ; + } + } } public CgroupCpuMonitor(final Map dimensions, String feed) { - this(null, dimensions, feed); + this(new ProcSelfCgroupDiscoverer(), dimensions, feed); } public CgroupCpuMonitor(final Map dimensions) @@ -58,7 +94,8 @@ public CgroupCpuMonitor() public boolean doMonitor(ServiceEmitter emitter) { final Cpu cpu = new Cpu(cgroupDiscoverer); - final Cpu.CpuAllocationMetric cpuSnapshot = cpu.snapshot(); + final Cpu.CpuMetrics cpuSnapshot = cpu.snapshot(); + long now = Instant.now().getEpochSecond(); final ServiceMetricEvent.Builder builder = builder(); MonitorUtils.addDimensionsToBuilder(builder, dimensions); @@ -68,6 +105,26 @@ public boolean doMonitor(ServiceEmitter emitter) computeProcessorQuota(cpuSnapshot.getQuotaUs(), cpuSnapshot.getPeriodUs()) )); + long elapsedJiffiesSnapshotSecs = now - prevJiffiesSnapshotAt; + if (elapsedJiffiesSnapshotSecs > 0) { + prevJiffiesSnapshotAt = now; + final Map elapsedJiffies = jiffies.to( + "usage", + ImmutableMap.builder() + .put(USER, cpuSnapshot.getUserJiffies()) + .put(SYSTEM, cpuSnapshot.getSystemJiffies()) + .put(TOTAL, cpuSnapshot.getTotalJiffies()) + .build() + ); + if (elapsedJiffies != null) { + double totalUsagePct = 100.0 * elapsedJiffies.get(TOTAL) / userHz / elapsedJiffiesSnapshotSecs; + double sysUsagePct = 100.0 * elapsedJiffies.get(SYSTEM) / userHz / elapsedJiffiesSnapshotSecs; + double userUsagePct = 100.0 * elapsedJiffies.get(USER) / userHz / elapsedJiffiesSnapshotSecs; + emitter.emit(builder.setMetric(TOTAL_USAGE_METRIC, totalUsagePct)); + emitter.emit(builder.setMetric(SYS_USAGE_METRIC, sysUsagePct)); + emitter.emit(builder.setMetric(USER_USAGE_METRIC, userUsagePct)); + } + } return true; } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java new file mode 100644 index 000000000000..07ead5a68c26 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; +import org.apache.druid.java.util.metrics.cgroups.Disk; +import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; + +import java.util.Map; + +public class CgroupDiskMonitor extends FeedDefiningMonitor +{ + final CgroupDiscoverer cgroupDiscoverer; + final Map dimensions; + private final KeyedDiff diff = new KeyedDiff(); + + public CgroupDiskMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) + { + super(feed); + this.cgroupDiscoverer = cgroupDiscoverer; + this.dimensions = dimensions; + } + + public CgroupDiskMonitor(final Map dimensions, String feed) + { + this(new ProcSelfCgroupDiscoverer(), dimensions, feed); + } + + public CgroupDiskMonitor(final Map dimensions) + { + this(dimensions, DEFAULT_METRICS_FEED); + } + + public CgroupDiskMonitor() + { + this(ImmutableMap.of()); + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + Map snapshot = new Disk(cgroupDiscoverer).snapshot(); + for (Map.Entry entry : snapshot.entrySet()) { + final Map stats = diff.to( + entry.getKey(), + ImmutableMap.builder() + .put("cgroup/disk/read/bytes", entry.getValue().getReadBytes()) + .put("cgroup/disk/read/count", entry.getValue().getReadCount()) + .put("cgroup/disk/write/bytes", entry.getValue().getWriteBytes()) + .put("cgroup/disk/write/count", entry.getValue().getWriteCount()) + .build() + ); + + if (stats != null) { + final ServiceMetricEvent.Builder builder = builder() + .setDimension("diskName", entry.getValue().getDiskName()); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + for (Map.Entry stat : stats.entrySet()) { + emitter.emit(builder.setMetric(stat.getKey(), stat.getValue())); + } + } + } + return true; + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java index d282cf5cddd0..442087198ae7 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.Memory; +import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; import java.util.Map; @@ -42,7 +43,7 @@ public CgroupMemoryMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) { - this(null, dimensions, feed); + this(new ProcSelfCgroupDiscoverer(), dimensions, feed); } public CgroupMemoryMonitor(final Map dimensions) @@ -60,16 +61,18 @@ public boolean doMonitor(ServiceEmitter emitter) { final Memory memory = new Memory(cgroupDiscoverer); final Memory.MemoryStat stat = memory.snapshot(); + final ServiceMetricEvent.Builder builder = builder(); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + emitter.emit(builder.setMetric("cgroup/memory/usage/bytes", stat.getUsage())); + emitter.emit(builder.setMetric("cgroup/memory/limit/bytes", stat.getLimit())); + stat.getMemoryStats().forEach((key, value) -> { - final ServiceMetricEvent.Builder builder = builder(); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); // See https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt // There are inconsistent units for these. Most are bytes. emitter.emit(builder.setMetric(StringUtils.format("cgroup/memory/%s", key), value)); }); stat.getNumaMemoryStats().forEach((key, value) -> { - final ServiceMetricEvent.Builder builder = builder().setDimension("numaZone", Long.toString(key)); - MonitorUtils.addDimensionsToBuilder(builder, dimensions); + builder().setDimension("numaZone", Long.toString(key)); value.forEach((k, v) -> emitter.emit(builder.setMetric(StringUtils.format("cgroup/memory_numa/%s/pages", k), v))); }); return true; diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupUtil.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupUtil.java index b9b29cdbdfcb..bd1e8ed20f55 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupUtil.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupUtil.java @@ -19,10 +19,32 @@ package org.apache.druid.java.util.metrics; +import com.google.common.primitives.Longs; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.Objects; import java.util.regex.Pattern; public class CgroupUtil { + private static final Logger LOG = new Logger(CgroupUtil.class); public static final String SPACE_MATCH = Pattern.quote(" "); public static final String COMMA_MATCH = Pattern.quote(","); + + public static long readLongValue(CgroupDiscoverer discoverer, String cgroup, String fileName, long defaultValue) + { + try { + List lines = Files.readAllLines(Paths.get(discoverer.discover(cgroup).toString(), fileName)); + return lines.stream().map(Longs::tryParse).filter(Objects::nonNull).findFirst().orElse(defaultValue); + } + catch (RuntimeException | IOException ex) { + LOG.warn(ex, "Unable to fetch %s", fileName); + return defaultValue; + } + } } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Cpu.java b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Cpu.java index a742db2c3cb9..05228bda04d9 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Cpu.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Cpu.java @@ -21,12 +21,13 @@ import com.google.common.primitives.Longs; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.metrics.CgroupUtil; +import java.io.BufferedReader; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.List; -import java.util.Objects; +import java.util.regex.Pattern; /** * Collect CPU share and quota information from cpu cgroup files. @@ -35,6 +36,7 @@ public class Cpu { private static final Logger LOG = new Logger(Cpu.class); private static final String CGROUP = "cpu"; + private static final String CPUACCT_STAT_FILE = "cpuacct.stat"; private static final String CPU_SHARES_FILE = "cpu.shares"; private static final String CPU_QUOTA_FILE = "cpu.cfs_quota_us"; private static final String CPU_PERIOD_FILE = "cpu.cfs_period_us"; @@ -51,28 +53,43 @@ public Cpu(CgroupDiscoverer cgroupDiscoverer) * * @return A snapshot with the data populated. */ - public CpuAllocationMetric snapshot() + public CpuMetrics snapshot() { - return new CpuAllocationMetric( - readLongValue(CPU_SHARES_FILE, -1), - readLongValue(CPU_QUOTA_FILE, 0), - readLongValue(CPU_PERIOD_FILE, 0) - ); - } - - private long readLongValue(String fileName, long defaultValeue) - { - try { - List lines = Files.readAllLines(Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), fileName)); - return lines.stream().map(Longs::tryParse).filter(Objects::nonNull).findFirst().orElse(defaultValeue); + long userJiffies = -1L; + long systemJiffies = -1L; + try (final BufferedReader reader = Files.newBufferedReader( + Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), CPUACCT_STAT_FILE) + )) { + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + final String[] parts = line.split(Pattern.quote(" ")); + if (parts.length != 2) { + // ignore + continue; + } + switch (parts[0]) { + case "user": + userJiffies = Longs.tryParse(parts[1]); + break; + case "system": + systemJiffies = Longs.tryParse(parts[1]); + break; + } + } } - catch (RuntimeException | IOException ex) { - LOG.error(ex, "Unable to fetch %s", fileName); - return defaultValeue; + catch (IOException | RuntimeException ex) { + LOG.error(ex, "Unable to fetch cpu snapshot"); } + + + return new CpuMetrics( + CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_SHARES_FILE, -1), + CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_QUOTA_FILE, 0), + CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_PERIOD_FILE, 0), + systemJiffies, userJiffies + ); } - public static class CpuAllocationMetric + public static class CpuMetrics { // Maps to cpu.shares - the share of CPU given to the process private final long shares; @@ -85,11 +102,19 @@ public static class CpuAllocationMetric // bandwidth decisions private final long periodUs; - CpuAllocationMetric(long shares, long quotaUs, long periodUs) + // Maps to user value at cpuacct.stat + private final long userJiffies; + + // Maps to system value at cpuacct.stat + private final long systemJiffies; + + CpuMetrics(long shares, long quotaUs, long periodUs, long systemJiffis, long userJiffies) { this.shares = shares; this.quotaUs = quotaUs; this.periodUs = periodUs; + this.userJiffies = userJiffies; + this.systemJiffies = systemJiffis; } public final long getShares() @@ -106,5 +131,20 @@ public final long getPeriodUs() { return periodUs; } + + public long getUserJiffies() + { + return userJiffies; + } + + public long getSystemJiffies() + { + return systemJiffies; + } + + public long getTotalJiffies() + { + return userJiffies + systemJiffies; + } } } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Disk.java b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Disk.java new file mode 100644 index 000000000000..b74fb322ba95 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Disk.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics.cgroups; + +import com.google.common.primitives.Longs; +import org.apache.druid.java.util.common.logger.Logger; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Pattern; + +public class Disk +{ + private static final Logger LOG = new Logger(Disk.class); + private static final String CGROUP = "blkio"; + private static final String IO_SERVICED_FILE = "blkio.throttle.io_serviced"; + private static final String IO_SERVICE_BYTES_FILE = "blkio.throttle.io_service_bytes"; + private static final String READ = "Read"; + private static final String WRITE = "Write"; + private final CgroupDiscoverer cgroupDiscoverer; + + public Disk(CgroupDiscoverer cgroupDiscoverer) + { + this.cgroupDiscoverer = cgroupDiscoverer; + } + + /** + * Take a snapshot of cpu cgroup data + * + * @return A snapshot with the data populated. + */ + public Map snapshot() + { + Map statsByDisk = new HashMap<>(); + + try (final BufferedReader reader = Files.newBufferedReader( + Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), IO_SERVICED_FILE))) { + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + final String[] parts = line.split(Pattern.quote(" ")); + if (parts.length != 3) { + // ignore + continue; + } + Metrics metrics = statsByDisk.computeIfAbsent(parts[0], majorMinor -> new Metrics(majorMinor)); + switch (parts[1]) { + case WRITE: + metrics.writeCount = Longs.tryParse(parts[2]); + break; + case READ: + metrics.readCount = Longs.tryParse(parts[2]); + break; + } + } + } + catch (IOException | RuntimeException ex) { + LOG.error(ex, "Unable to fetch disk snapshot"); + } + + try (final BufferedReader reader = Files.newBufferedReader( + Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), IO_SERVICE_BYTES_FILE))) { + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + final String[] parts = line.split(Pattern.quote(" ")); + if (parts.length != 3) { + // ignore + continue; + } + Metrics metrics = statsByDisk.computeIfAbsent(parts[0], majorMinor -> new Metrics(majorMinor)); + switch (parts[1]) { + case WRITE: + metrics.writeBytes = Longs.tryParse(parts[2]); + break; + case READ: + metrics.readBytes = Longs.tryParse(parts[2]); + break; + } + } + } + catch (IOException | RuntimeException ex) { + LOG.error(ex, "Unable to fetch memory snapshot"); + } + + return statsByDisk; + } + + public static class Metrics + { + String diskName; + long readCount; + long writeCount; + long readBytes; + long writeBytes; + + public Metrics(String majorMinor) + { + try { + File deviceFile = new File("/sys/dev/block/" + majorMinor); + if (deviceFile.exists()) { + diskName = deviceFile.getCanonicalPath(); + } + } + catch (IOException e) { + LOG.warn("Unable to get disk name for " + majorMinor); + } + finally { + if (diskName == null) { + diskName = majorMinor; + } + } + } + + public long getReadCount() + { + return readCount; + } + public long getWriteCount() + { + return writeCount; + } + + public long getReadBytes() + { + return readBytes; + } + public long getWriteBytes() + { + return writeBytes; + } + public String getDiskName() + { + return diskName; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Metrics metrics = (Metrics) o; + return readCount == metrics.readCount + && writeCount == metrics.writeCount + && readBytes == metrics.readBytes + && writeBytes == metrics.writeBytes + && Objects.equals(diskName, metrics.diskName); + } + + @Override + public int hashCode() + { + return Objects.hash(diskName, readCount, writeCount, readBytes, writeBytes); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Memory.java b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Memory.java index bc4487bb478d..01e7fed9c9f5 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Memory.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Memory.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.primitives.Longs; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.metrics.CgroupUtil; import java.io.BufferedReader; import java.io.IOException; @@ -36,6 +37,8 @@ public class Memory private static final Logger LOG = new Logger(Memory.class); private static final String CGROUP = "memory"; private static final String CGROUP_MEMORY_FILE = "memory.stat"; + private static final String MEMORY_USAGE_FILE = "memory.usage_in_bytes"; + private static final String MEMORY_LIMIT_FILE = "memory.limit_in_bytes"; private static final String CGROUP_MEMORY_NUMA_FILE = "memory.numa_stat"; private final CgroupDiscoverer cgroupDiscoverer; @@ -47,6 +50,8 @@ public Memory(CgroupDiscoverer cgroupDiscoverer) public MemoryStat snapshot() { final MemoryStat memoryStat = new MemoryStat(); + memoryStat.usage = CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, MEMORY_USAGE_FILE, -1); + memoryStat.limit = CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, MEMORY_LIMIT_FILE, -1); try (final BufferedReader reader = Files.newBufferedReader( Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), CGROUP_MEMORY_FILE) @@ -102,6 +107,8 @@ public static class MemoryStat { private final Map memoryStats = new HashMap<>(); private final Map> numaMemoryStats = new HashMap<>(); + private long usage; + private long limit; public Map getMemoryStats() { @@ -113,5 +120,15 @@ public Map> getNumaMemoryStats() // They can modify the inner map... but why? return ImmutableMap.copyOf(numaMemoryStats); } + + public long getUsage() + { + return usage; + } + + public long getLimit() + { + return limit; + } } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java index d6379840a495..04f147481288 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java @@ -20,6 +20,7 @@ package org.apache.druid.java.util.metrics; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; @@ -36,6 +37,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class CgroupCpuMonitorTest { @@ -45,6 +47,7 @@ public class CgroupCpuMonitorTest public TemporaryFolder temporaryFolder = new TemporaryFolder(); private File procDir; private File cgroupDir; + private File statFile; private CgroupDiscoverer discoverer; @Before @@ -60,13 +63,15 @@ public void setUp() throws IOException ); FileUtils.mkdirp(cpuDir); + statFile = new File(cpuDir, "cpuacct.stat"); TestUtils.copyOrReplaceResource("/cpu.shares", new File(cpuDir, "cpu.shares")); TestUtils.copyOrReplaceResource("/cpu.cfs_quota_us", new File(cpuDir, "cpu.cfs_quota_us")); TestUtils.copyOrReplaceResource("/cpu.cfs_period_us", new File(cpuDir, "cpu.cfs_period_us")); + TestUtils.copyOrReplaceResource("/cpuacct.stat", statFile); } @Test - public void testMonitor() + public void testMonitor() throws IOException, InterruptedException { final CgroupCpuMonitor monitor = new CgroupCpuMonitor(discoverer, ImmutableMap.of(), "some_feed"); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); @@ -79,6 +84,26 @@ public void testMonitor() Assert.assertEquals(1024L, sharesEvent.get("value")); Assert.assertEquals("cgroup/cpu/cores_quota", coresEvent.get("metric")); Assert.assertEquals(3.0D, coresEvent.get("value")); + emitter.flush(); + + TestUtils.copyOrReplaceResource("/cpuacct.stat-2", statFile); + // We need to pass atleast a second for the calculation to trigger + // to avoid divide by zero. + Thread.sleep(1000); + + Assert.assertTrue(monitor.doMonitor(emitter)); + Assert.assertTrue( + emitter + .getEvents() + .stream() + .map(e -> e.toMap().get("metric")) + .collect(Collectors.toList()) + .containsAll( + ImmutableSet.of( + CgroupCpuMonitor.TOTAL_USAGE_METRIC, + CgroupCpuMonitor.USER_USAGE_METRIC, + CgroupCpuMonitor.SYS_USAGE_METRIC + ))); } @Test diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java new file mode 100644 index 000000000000..21a834587292 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; +import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer; +import org.apache.druid.java.util.metrics.cgroups.TestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +public class CgroupDiskMonitorTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private File procDir; + private File cgroupDir; + private File servicedFile; + private File serviceBytesFile; + private CgroupDiscoverer discoverer; + + @Before + public void setUp() throws IOException + { + cgroupDir = temporaryFolder.newFolder(); + procDir = temporaryFolder.newFolder(); + discoverer = new ProcCgroupDiscoverer(procDir.toPath()); + TestUtils.setUpCgroups(procDir, cgroupDir); + final File blkioDir = new File( + cgroupDir, + "blkio/system.slice/some.service/" + ); + + FileUtils.mkdirp(blkioDir); + servicedFile = new File(blkioDir, "blkio.throttle.io_serviced"); + serviceBytesFile = new File(blkioDir, "blkio.throttle.io_service_bytes"); + TestUtils.copyResource("/blkio.throttle.io_service_bytes", serviceBytesFile); + TestUtils.copyResource("/blkio.throttle.io_serviced", servicedFile); + } + + @Test + public void testMonitor() throws IOException + { + final CgroupDiskMonitor monitor = new CgroupDiskMonitor(discoverer, ImmutableMap.of(), "some_feed"); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + Assert.assertTrue(monitor.doMonitor(emitter)); + Assert.assertEquals(0, emitter.getEvents().size()); + + TestUtils.copyOrReplaceResource("/blkio.throttle.io_service_bytes-2", serviceBytesFile); + TestUtils.copyOrReplaceResource("/blkio.throttle.io_serviced-2", servicedFile); + + Assert.assertTrue(monitor.doMonitor(emitter)); + Assert.assertEquals(8, emitter.getEvents().size()); + Assert.assertTrue( + emitter + .getEvents() + .stream() + .map(e -> e.toMap().get("value")) + .allMatch(val -> Long.valueOf(10).equals(val))); + } +} diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java index 39ff532a5a34..7827368ec9f5 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java @@ -61,6 +61,8 @@ public void setUp() throws IOException FileUtils.mkdirp(memoryDir); TestUtils.copyResource("/memory.stat", new File(memoryDir, "memory.stat")); TestUtils.copyResource("/memory.numa_stat", new File(memoryDir, "memory.numa_stat")); + TestUtils.copyResource("/memory.usage_in_bytes", new File(memoryDir, "memory.usage_in_bytes")); + TestUtils.copyResource("/memory.limit_in_bytes", new File(memoryDir, "memory.limit_in_bytes")); } @Test @@ -70,6 +72,6 @@ public void testMonitor() final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); final List actualEvents = emitter.getEvents(); - Assert.assertEquals(44, actualEvents.size()); + Assert.assertEquals(46, actualEvents.size()); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuTest.java index 817b4664c27f..1af0ba427bf9 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuTest.java @@ -52,6 +52,8 @@ public void setUp() throws IOException TestUtils.copyOrReplaceResource("/cpu.shares", new File(cpuDir, "cpu.shares")); TestUtils.copyOrReplaceResource("/cpu.cfs_quota_us", new File(cpuDir, "cpu.cfs_quota_us")); TestUtils.copyOrReplaceResource("/cpu.cfs_period_us", new File(cpuDir, "cpu.cfs_period_us")); + TestUtils.copyOrReplaceResource("/cpuacct.usage", new File(cpuDir, "cpuacct.usage")); + TestUtils.copyOrReplaceResource("/cpuacct.stat", new File(cpuDir, "cpuacct.stat")); } @Test @@ -60,19 +62,24 @@ public void testWontCrash() final Cpu cpu = new Cpu(cgroup -> { throw new RuntimeException("Should still continue"); }); - final Cpu.CpuAllocationMetric metric = cpu.snapshot(); + final Cpu.CpuMetrics metric = cpu.snapshot(); Assert.assertEquals(-1L, metric.getShares()); Assert.assertEquals(0, metric.getQuotaUs()); Assert.assertEquals(0, metric.getPeriodUs()); + Assert.assertEquals(-1L, metric.getSystemJiffies()); + Assert.assertEquals(-1L, metric.getUserJiffies()); } @Test public void testSimpleLoad() { final Cpu cpu = new Cpu(discoverer); - final Cpu.CpuAllocationMetric snapshot = cpu.snapshot(); + final Cpu.CpuMetrics snapshot = cpu.snapshot(); Assert.assertEquals(1024, snapshot.getShares()); Assert.assertEquals(300000, snapshot.getQuotaUs()); Assert.assertEquals(100000, snapshot.getPeriodUs()); + Assert.assertEquals(143871L, snapshot.getSystemJiffies()); + Assert.assertEquals(251183L, snapshot.getUserJiffies()); + Assert.assertEquals(395054L, snapshot.getTotalJiffies()); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/DiskTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/DiskTest.java new file mode 100644 index 000000000000..514b4cea9ea1 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/DiskTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics.cgroups; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.FileUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Map; + +public class DiskTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private File procDir; + private File cgroupDir; + private CgroupDiscoverer discoverer; + + @Before + public void setUp() throws Exception + { + cgroupDir = temporaryFolder.newFolder(); + procDir = temporaryFolder.newFolder(); + discoverer = new ProcCgroupDiscoverer(procDir.toPath()); + TestUtils.setUpCgroups(procDir, cgroupDir); + final File blkioDir = new File( + cgroupDir, + "blkio/system.slice/some.service" + ); + + FileUtils.mkdirp(blkioDir); + TestUtils.copyResource("/blkio.throttle.io_serviced", new File(blkioDir, "blkio.throttle.io_serviced")); + TestUtils.copyResource("/blkio.throttle.io_service_bytes", new File(blkioDir, "blkio.throttle.io_service_bytes")); + } + + @Test + public void testWontCrash() + { + final Disk disk = new Disk((cgroup) -> { + throw new RuntimeException("shouldContinue"); + }); + final Map stats = disk.snapshot(); + Assert.assertEquals(ImmutableMap.of(), stats); + } + + @Test + public void testSimpleSnapshot() + { + final Map stats = new Disk(discoverer).snapshot(); + Assert.assertEquals(ImmutableSet.of("259:0", "259:7"), stats.keySet()); + + Assert.assertEquals(stats.get("259:0").getReadCount(), 98L); + Assert.assertEquals(stats.get("259:0").getWriteCount(), 756L); + Assert.assertEquals(stats.get("259:0").getReadBytes(), 55000L); + Assert.assertEquals(stats.get("259:0").getWriteBytes(), 6208512L); + + Assert.assertEquals(stats.get("259:7").getReadCount(), 26L); + Assert.assertEquals(stats.get("259:7").getWriteCount(), 0L); + Assert.assertEquals(stats.get("259:7").getReadBytes(), 1773568L); + Assert.assertEquals(stats.get("259:7").getWriteBytes(), 0L); + } +} diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/MemoryTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/MemoryTest.java index 87e68d521d68..bd5b10c7225a 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/MemoryTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/MemoryTest.java @@ -57,6 +57,8 @@ public void setUp() throws Exception FileUtils.mkdirp(memoryDir); TestUtils.copyResource("/memory.stat", new File(memoryDir, "memory.stat")); TestUtils.copyResource("/memory.numa_stat", new File(memoryDir, "memory.numa_stat")); + TestUtils.copyResource("/memory.usage_in_bytes", new File(memoryDir, "memory.usage_in_bytes")); + TestUtils.copyResource("/memory.limit_in_bytes", new File(memoryDir, "memory.limit_in_bytes")); } @Test @@ -75,6 +77,10 @@ public void testSimpleSnapshot() { final Memory memory = new Memory(discoverer); final Memory.MemoryStat stat = memory.snapshot(); + + Assert.assertEquals(5000000, stat.getUsage()); + Assert.assertEquals(8000000, stat.getLimit()); + final Map expectedMemoryStats = new HashMap<>(); expectedMemoryStats.put("inactive_anon", 0L); expectedMemoryStats.put("total_pgfault", 13137L); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/TestUtils.java b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/TestUtils.java index e522ff679621..1aca9e7f7c3f 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/TestUtils.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/TestUtils.java @@ -26,6 +26,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.nio.file.StandardCopyOption; public class TestUtils { @@ -65,7 +66,7 @@ public static void copyResource(String resource, File out) throws IOException public static void copyOrReplaceResource(String resource, File out) throws IOException { - Files.copy(TestUtils.class.getResourceAsStream(resource), out.toPath()); + Files.copy(TestUtils.class.getResourceAsStream(resource), out.toPath(), StandardCopyOption.REPLACE_EXISTING); Assert.assertTrue(out.exists()); Assert.assertNotEquals(0, out.length()); } diff --git a/processing/src/test/resources/blkio.throttle.io_service_bytes b/processing/src/test/resources/blkio.throttle.io_service_bytes new file mode 100644 index 000000000000..66f285b698f8 --- /dev/null +++ b/processing/src/test/resources/blkio.throttle.io_service_bytes @@ -0,0 +1,13 @@ +259:0 Read 55000 +259:0 Write 6208512 +259:0 Sync 6208512 +259:0 Async 0 +259:0 Discard 0 +259:0 Total 6263512 +259:7 Read 1773568 +259:7 Write 0 +259:7 Sync 1773568 +259:7 Async 0 +259:7 Discard 0 +259:7 Total 1773568 +Total 8037080 \ No newline at end of file diff --git a/processing/src/test/resources/blkio.throttle.io_service_bytes-2 b/processing/src/test/resources/blkio.throttle.io_service_bytes-2 new file mode 100644 index 000000000000..b17b7335354e --- /dev/null +++ b/processing/src/test/resources/blkio.throttle.io_service_bytes-2 @@ -0,0 +1,13 @@ +259:0 Read 55010 +259:0 Write 6208522 +259:0 Sync 6208522 +259:0 Async 0 +259:0 Discard 0 +259:0 Total 6263532 +259:7 Read 1773578 +259:7 Write 10 +259:7 Sync 1773588 +259:7 Async 0 +259:7 Discard 0 +259:7 Total 1773588 +Total 8037120 \ No newline at end of file diff --git a/processing/src/test/resources/blkio.throttle.io_serviced b/processing/src/test/resources/blkio.throttle.io_serviced new file mode 100644 index 000000000000..7c0a9f9adfff --- /dev/null +++ b/processing/src/test/resources/blkio.throttle.io_serviced @@ -0,0 +1,13 @@ +259:0 Read 98 +259:0 Write 756 +259:0 Sync 854 +259:0 Async 0 +259:0 Discard 0 +259:0 Total 854 +259:7 Read 26 +259:7 Write 0 +259:7 Sync 26 +259:7 Async 0 +259:7 Discard 0 +259:7 Total 26 +Total 880 \ No newline at end of file diff --git a/processing/src/test/resources/blkio.throttle.io_serviced-2 b/processing/src/test/resources/blkio.throttle.io_serviced-2 new file mode 100644 index 000000000000..5d41b635d94e --- /dev/null +++ b/processing/src/test/resources/blkio.throttle.io_serviced-2 @@ -0,0 +1,13 @@ +259:0 Read 108 +259:0 Write 766 +259:0 Sync 874 +259:0 Async 0 +259:0 Discard 0 +259:0 Total 874 +259:7 Read 36 +259:7 Write 10 +259:7 Sync 46 +259:7 Async 0 +259:7 Discard 0 +259:7 Total 46 +Total 920 \ No newline at end of file diff --git a/processing/src/test/resources/cpuacct.stat b/processing/src/test/resources/cpuacct.stat new file mode 100644 index 000000000000..b049b71ef134 --- /dev/null +++ b/processing/src/test/resources/cpuacct.stat @@ -0,0 +1,2 @@ +user 251183 +system 143871 \ No newline at end of file diff --git a/processing/src/test/resources/cpuacct.stat-2 b/processing/src/test/resources/cpuacct.stat-2 new file mode 100644 index 000000000000..c93ee183c215 --- /dev/null +++ b/processing/src/test/resources/cpuacct.stat-2 @@ -0,0 +1,2 @@ +user 251208 +system 143896 \ No newline at end of file diff --git a/processing/src/test/resources/cpuacct.usage b/processing/src/test/resources/cpuacct.usage new file mode 100644 index 000000000000..d4375a28ae7d --- /dev/null +++ b/processing/src/test/resources/cpuacct.usage @@ -0,0 +1 @@ +5000000 \ No newline at end of file diff --git a/processing/src/test/resources/memory.limit_in_bytes b/processing/src/test/resources/memory.limit_in_bytes new file mode 100644 index 000000000000..10f38ec0043d --- /dev/null +++ b/processing/src/test/resources/memory.limit_in_bytes @@ -0,0 +1 @@ +8000000 \ No newline at end of file diff --git a/processing/src/test/resources/memory.usage_in_bytes b/processing/src/test/resources/memory.usage_in_bytes new file mode 100644 index 000000000000..d4375a28ae7d --- /dev/null +++ b/processing/src/test/resources/memory.usage_in_bytes @@ -0,0 +1 @@ +5000000 \ No newline at end of file diff --git a/website/.spelling b/website/.spelling index a14f233539c3..b2167dbce921 100644 --- a/website/.spelling +++ b/website/.spelling @@ -277,6 +277,7 @@ backpressure base64 big-endian bigint +blkio blobstore Boolean boolean