From cb5c4ec8a4feff875adeedd0740df2dc8d538cb3 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 21 Apr 2026 18:52:07 +0000 Subject: [PATCH 1/2] improves monitor tserver view Made a few major changes in the this PR all in support of providing an improved tserver page on the monitor. * Gave the RPC thread a consistent name across all server types. This was done to make the thread name findable in a metrics tag using a constant. * Setup a custom monitor metrics registry. This was done because it may not be safe to read from registry in another thread (see micrometer-metrics/micrometer#7417) AND more importantly to get step functionality where metrics like function counters show the delta for the last 30 seconds. * Refactored the SeversView code to be more flexible. It used to directly compute data from a a single metric. Now its easier to do arbitrary reductions on a collection of metrics for the data in a column. * Started collecting executor metrics on thread pools and used those to create some of the tserver columns in the monitor. Using the metrics requires looking for specific thread pool names in the tags. * Added a new meric to track scan errors. * Fixed some incorrect metrics types. --- .../apache/accumulo/core/metrics/Metric.java | 78 ++-- .../core/metrics/MonitorMeterRegistry.java | 63 +++ .../core/util/threads/ThreadPoolNames.java | 2 + .../accumulo/server/AbstractServer.java | 15 +- .../server/metrics/MetricsInfoImpl.java | 7 + .../accumulo/server/rpc/TServerUtils.java | 88 ++-- .../accumulo/server/rpc/TServerUtilsTest.java | 2 +- .../apache/accumulo/compactor/Compactor.java | 5 +- .../org/apache/accumulo/manager/Manager.java | 2 +- .../monitor/next/views/ServersView.java | 399 +++++++++++++++--- .../resources/js/server_process_common.js | 32 +- .../apache/accumulo/tserver/ScanServer.java | 4 +- .../apache/accumulo/tserver/TabletServer.java | 4 +- .../tserver/TabletServerResourceManager.java | 14 +- .../metrics/TabletServerScanMetrics.java | 14 +- .../accumulo/tserver/scan/LookupTask.java | 2 + .../accumulo/tserver/scan/NextBatchTask.java | 2 + 17 files changed, 566 insertions(+), 167 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/metrics/MonitorMeterRegistry.java diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java index 633a149bd89..c00c1fb16ed 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java @@ -210,10 +210,10 @@ public enum Metric { "Total number of minor compactions performed.", MetricDocSection.COMPACTION, "Minc Completed", null, NUMBER), TSERVER_TABLETS_ONLINE("accumulo.tablets.online", MetricType.GAUGE, "Number of online tablets.", - MetricDocSection.TABLET_SERVER, "Tablets Online", null, NUMBER), + MetricDocSection.TABLET_SERVER, "Tablets", null, NUMBER), TSERVER_TABLETS_LONG_ASSIGNMENTS("accumulo.tablets.assignments.warning", MetricType.GAUGE, "Number of tablet assignments that are taking longer than the configured warning duration.", - MetricDocSection.TABLET_SERVER, "Tablet Assignments Overdue", null, NUMBER), + MetricDocSection.TABLET_SERVER, "Slow Assignments", null, NUMBER), TSERVER_TABLETS_OPENING("accumulo.tablets.opening", MetricType.GAUGE, "Number of opening tablets.", MetricDocSection.TABLET_SERVER, "Tablets Opening", null, NUMBER), @@ -228,9 +228,10 @@ public enum Metric { TSERVER_INGEST_BYTES("accumulo.ingest.bytes", MetricType.GAUGE, "Ingest byte count. The rate can be derived from this metric.", MetricDocSection.TABLET_SERVER, "Bytes Ingested", null, BYTES), + // TODO does this duration expect millis in javascript? TSERVER_HOLD("accumulo.ingest.hold", MetricType.GAUGE, "Duration for which commits have been held in milliseconds.", MetricDocSection.TABLET_SERVER, - "Ingest Commit Hold Time", null, NUMBER), + "Hold Time", null, DURATION), TSERVER_TABLETS_ONLINE_ONDEMAND("accumulo.tablets.ondemand.online", MetricType.GAUGE, "Number of online on-demand tablets", MetricDocSection.TABLET_SERVER, "Online On-Demand Tablets", null, NUMBER), @@ -253,46 +254,50 @@ public enum Metric { null, NUMBER), // Scan Metrics - SCAN_BUSY_TIMEOUT_COUNT("accumulo.scan.busy.timeout.count", MetricType.COUNTER, + SCAN_BUSY_TIMEOUT_COUNT("accumulo.scan.busy.timeout.count", MetricType.FUNCTION_COUNTER, "Count of the scans where a busy timeout happened.", MetricDocSection.SCAN, "Scan Busy Count", null, NUMBER), SCAN_TIMES("accumulo.scan.times", MetricType.TIMER, "Scan session lifetime (creation to close).", MetricDocSection.SCAN, "Scan Session Total Time", null, NUMBER), SCAN_OPEN_FILES("accumulo.scan.files.open", MetricType.GAUGE, "Number of files open for scans.", MetricDocSection.SCAN, "Scan Files Open", null, NUMBER), - SCAN_RESULTS("accumulo.scan.result", MetricType.GAUGE, "Results per scan.", MetricDocSection.SCAN, - "Scan Result Count", null, NUMBER), - SCAN_YIELDS("accumulo.scan.yields", MetricType.GAUGE, "Counts scans that have yielded.", - MetricDocSection.SCAN, "Scan Yield Count", null, NUMBER), - SCAN_START("accumulo.scan.start", MetricType.COUNTER, + SCAN_RESULTS("accumulo.scan.result", MetricType.DISTRIBUTION_SUMMARY, "Results per scan.", + MetricDocSection.SCAN, "Scan Result Count", null, NUMBER), + SCAN_YIELDS("accumulo.scan.yields", MetricType.DISTRIBUTION_SUMMARY, + "Counts scans that have yielded.", MetricDocSection.SCAN, "Scan Yield Count", null, NUMBER), + SCAN_START("accumulo.scan.start", MetricType.FUNCTION_COUNTER, "Number of calls to start a scan or multiscan.", MetricDocSection.SCAN, "Scan Start Count", null, NUMBER), - SCAN_CONTINUE("accumulo.scan.continue", MetricType.COUNTER, + SCAN_CONTINUE("accumulo.scan.continue", MetricType.FUNCTION_COUNTER, "Number of calls to continue a scan or multiscan.", MetricDocSection.SCAN, "Scan Continue Count", null, NUMBER), - SCAN_CLOSE("accumulo.scan.close", MetricType.COUNTER, + SCAN_CLOSE("accumulo.scan.close", MetricType.FUNCTION_COUNTER, "Number of calls to close a scan or multiscan.", MetricDocSection.SCAN, "Scan Close Count", null, NUMBER), - SCAN_QUERIES("accumulo.scan.queries", MetricType.GAUGE, "Number of queries made during scans.", - MetricDocSection.SCAN, "Tablet Lookup Count", null, NUMBER), - SCAN_SCANNED_ENTRIES("accumulo.scan.query.scanned.entries", MetricType.GAUGE, + SCAN_QUERIES("accumulo.scan.queries", MetricType.FUNCTION_COUNTER, + "Number of queries made during scans.", MetricDocSection.SCAN, "Tablet Lookup Count", null, + NUMBER), + SCAN_SCANNED_ENTRIES("accumulo.scan.query.scanned.entries", MetricType.FUNCTION_COUNTER, "Count of scanned entries. The rate can be derived from this metric.", MetricDocSection.SCAN, - "Scanned Entry Count", null, NUMBER), - SCAN_QUERY_SCAN_RESULTS("accumulo.scan.query.results", MetricType.GAUGE, + "Scanned Entries", null, NUMBER), + SCAN_QUERY_SCAN_RESULTS("accumulo.scan.query.results", MetricType.FUNCTION_COUNTER, "Query count. The rate can be derived from this metric.", MetricDocSection.SCAN, - "Returned Entry Count", null, NUMBER), - SCAN_QUERY_SCAN_RESULTS_BYTES("accumulo.scan.query.results.bytes", MetricType.GAUGE, + "Returned Entries", null, NUMBER), + SCAN_QUERY_SCAN_RESULTS_BYTES("accumulo.scan.query.results.bytes", MetricType.FUNCTION_COUNTER, "Query byte count. The rate can be derived from this metric.", MetricDocSection.SCAN, - "Returned Bytes Count", null, BYTES), - SCAN_PAUSED_FOR_MEM("accumulo.scan.paused.for.memory", MetricType.COUNTER, + "Returned Bytes", null, BYTES), + SCAN_PAUSED_FOR_MEM("accumulo.scan.paused.for.memory", MetricType.FUNCTION_COUNTER, "Count of scans paused due to server being low on memory.", MetricDocSection.SCAN, "Scans Paused For Low Memory", null, NUMBER), - SCAN_RETURN_FOR_MEM("accumulo.scan.return.early.for.memory", MetricType.COUNTER, + SCAN_RETURN_FOR_MEM("accumulo.scan.return.early.for.memory", MetricType.FUNCTION_COUNTER, "Count of scans that returned results early due to server being low on memory.", MetricDocSection.SCAN, "Scans Returned Early For Low Memory", null, NUMBER), SCAN_ZOMBIE_THREADS("accumulo.scan.zombie.threads", MetricType.GAUGE, "Number of scan threads that have no associated client session.", MetricDocSection.SCAN, "Scan Zombie Thread Count", null, NUMBER), + SCAN_ERRORS("accumulo.scan.errors", MetricType.FUNCTION_COUNTER, + "Number of scan task that had an exception.", MetricDocSection.SCAN, "Failed scans", null, + NUMBER), // Major Compaction Metrics MAJC_PAUSED("accumulo.compaction.majc.paused", MetricType.COUNTER, @@ -334,23 +339,22 @@ public enum Metric { // Block Cache Metrics BLOCKCACHE_INDEX_HITCOUNT("accumulo.blockcache.index.hitcount", MetricType.FUNCTION_COUNTER, - "Index block cache hit count.", MetricDocSection.BLOCK_CACHE, "Index Block Cache Hit Count", - null, NUMBER), + "Index block cache hit count.", MetricDocSection.BLOCK_CACHE, "Index Cache Hit", null, + NUMBER), BLOCKCACHE_INDEX_REQUESTCOUNT("accumulo.blockcache.index.requestcount", MetricType.FUNCTION_COUNTER, "Index block cache request count.", MetricDocSection.BLOCK_CACHE, - "Index Block Cache Request Count", null, NUMBER), + "Index Cache Request", null, NUMBER), BLOCKCACHE_INDEX_EVICTIONCOUNT("accumulo.blockcache.index.evictioncount", MetricType.FUNCTION_COUNTER, "Index block cache eviction count.", - MetricDocSection.BLOCK_CACHE, "Index Block Cache Eviction Count", null, NUMBER), + MetricDocSection.BLOCK_CACHE, "Index Cache Eviction", null, NUMBER), BLOCKCACHE_DATA_HITCOUNT("accumulo.blockcache.data.hitcount", MetricType.FUNCTION_COUNTER, - "Data block cache hit count.", MetricDocSection.BLOCK_CACHE, "Data Block Cache Hit Count", - null, NUMBER), + "Data block cache hit count.", MetricDocSection.BLOCK_CACHE, "Data Cache Hit", null, NUMBER), BLOCKCACHE_DATA_REQUESTCOUNT("accumulo.blockcache.data.requestcount", MetricType.FUNCTION_COUNTER, - "Data block cache request count.", MetricDocSection.BLOCK_CACHE, - "Data Block Cache Request Count", null, NUMBER), + "Data block cache request count.", MetricDocSection.BLOCK_CACHE, "Data Cache Request", null, + NUMBER), BLOCKCACHE_DATA_EVICTIONCOUNT("accumulo.blockcache.data.evictioncount", MetricType.FUNCTION_COUNTER, "Data block cache eviction count.", MetricDocSection.BLOCK_CACHE, - "Data Block Cache Eviction Count", null, NUMBER), + "Data Cache Eviction", null, NUMBER), BLOCKCACHE_SUMMARY_HITCOUNT("accumulo.blockcache.summary.hitcount", MetricType.FUNCTION_COUNTER, "Summary block cache hit count.", MetricDocSection.BLOCK_CACHE, "Summary Block Cache Hit Count", null, NUMBER), @@ -387,16 +391,26 @@ public enum Metric { MetricDocSection.GENERAL_SERVER, "Tablet Recovery Longest Time", null, DURATION), RECOVERIES_AVG_PROGRESS("accumulo.recoveries.avg.progress", MetricType.GAUGE, "The average percentage (0.0 - 99.9) of the in progress recoveries.", - MetricDocSection.GENERAL_SERVER, "Tablet Recovery Avg Percent Complete", null, PERCENT); + MetricDocSection.GENERAL_SERVER, "Tablet Recovery Avg Percent Complete", null, PERCENT), + + // Executor metrics + EXECUTOR_COMPLETED("executor.completed", MetricType.FUNCTION_COUNTER, + "Task completed by a thread pool. Each thread pool emits this metric w/ a different tag.", + MetricDocSection.GENERAL_SERVER, "Completed task", null, NUMBER), + EXECUTOR_QUEUED("executor.queued", MetricType.GAUGE, + "Task queued for a thread pool. Each thread pool emits this metric w/ a different tag.", + MetricDocSection.GENERAL_SERVER, "Queued task", null, NUMBER); - public static enum MonitorCssClass { + public enum MonitorCssClass { BYTES("big-size"), + BYTES_RATE("rate-size"), DATE_END("end-date"), DATE_START("start-date"), DURATION("duration"), IDLE_STATE("idle-state"), MEMORY_STATE("memory-state"), NUMBER("big-num"), + RATE("rate-num"), PERCENT("percent"); private final String cssClass; diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MonitorMeterRegistry.java b/core/src/main/java/org/apache/accumulo/core/metrics/MonitorMeterRegistry.java new file mode 100644 index 00000000000..40049ed0870 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MonitorMeterRegistry.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.metrics; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import io.micrometer.core.instrument.Clock; +import io.micrometer.core.instrument.step.StepMeterRegistry; +import io.micrometer.core.instrument.step.StepRegistryConfig; + +public class MonitorMeterRegistry extends StepMeterRegistry { + + public static final Duration STEP = Duration.ofSeconds(30); + + private static final StepRegistryConfig CONFIG = new StepRegistryConfig() { + + @Override + public String prefix() { + return "monitor"; + } + + @Override + public String get(String key) { + return null; + } + + @Override + public Duration step() { + return STEP; + } + }; + + public MonitorMeterRegistry() { + super(CONFIG, Clock.SYSTEM); + } + + @Override + protected void publish() { + + } + + @Override + protected TimeUnit getBaseTimeUnit() { + return TimeUnit.MILLISECONDS; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java index f566fb2f03d..a379f02b179 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@ -47,6 +47,8 @@ public enum ThreadPoolNames { MANAGER_UPGRADE_COORDINATOR_METADATA_POOL("accumulo.pool.manager.upgrade.metadata"), METADATA_TABLET_MIGRATION_POOL("accumulo.pool.metadata.tablet.migration"), METADATA_TABLET_ASSIGNMENT_POOL("accumulo.pool.metadata.tablet.assignment"), + RPC_POOL("accumulo.pool.rpc"), + SCAN_EXECUTOR_PREFIX("accumulo.pool.scan.exec."), SCAN_SERVER_TABLET_METADATA_CACHE_POOL("accumulo.pool.scan.server.tablet.metadata.cache"), SCANNER_READ_AHEAD_POOL("accumulo.pool.client.context.scanner.read.ahead"), SCHED_FUTURE_CHECKER_POOL("accumulo.pool.scheduled.future.checker"), diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index 1cadeabaa33..052df4d1205 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@ -53,6 +53,7 @@ import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.mem.LowMemoryDetector; import org.apache.accumulo.server.metrics.MetricResponseWrapper; +import org.apache.accumulo.server.metrics.MetricsInfoImpl; import org.apache.accumulo.server.metrics.ProcessMetrics; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.security.SecurityUtil; @@ -67,7 +68,6 @@ import com.google.flatbuffers.FlatBufferBuilder; import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Metrics; public abstract class AbstractServer implements AutoCloseable, MetricsProducer, Runnable, ServerProcessService.Iface { @@ -99,6 +99,7 @@ public static void startServer(AbstractServer server, Logger LOG) throws Excepti private final AtomicBoolean closed = new AtomicBoolean(false); private final Set monitorMetricExclusions; + @SuppressWarnings("deprecation") protected AbstractServer(ServerId.Type serverType, ServerOpts opts, BiFunction serverContextFactory, String[] args) { @@ -106,7 +107,8 @@ protected AbstractServer(ServerId.Type serverType, ServerOpts opts, this.applicationName = serverType.name(); opts.parseArgs(applicationName, args); var siteConfig = opts.getSiteConfiguration(); - final String newBindParameter = siteConfig.get(Property.RPC_PROCESS_BIND_ADDRESS); + final String newBindParameter = siteConfig.get(siteConfig + .resolve(Property.RPC_PROCESS_BIND_ADDRESS, Property.GENERAL_PROCESS_BIND_ADDRESS)); // If new bind parameter passed on command line or in file, then use it. if (newBindParameter != null && !newBindParameter.equals(Property.RPC_PROCESS_BIND_ADDRESS.getDefaultValue())) { @@ -404,9 +406,12 @@ public MetricResponse getMetrics(TInfo tinfo, TCredentials credentials) throws T response.setResourceGroup(getResourceGroup().canonical()); response.setTimestamp(System.currentTimeMillis()); - if (context.getMetricsInfo().isMetricsEnabled()) { - Metrics.globalRegistry.getMeters().forEach(m -> { - if (m.getId().getName().startsWith("accumulo.")) { + var registry = MetricsInfoImpl.MONITOR_REGISTRY.get(); + if (registry != null) { + registry.getMeters().forEach(m -> { + if (m.getId().getName().startsWith("accumulo.") + || m.getId().getName().equals(Metric.EXECUTOR_COMPLETED.getName()) + || m.getId().getName().equals(Metric.EXECUTOR_QUEUED.getName())) { if (!this.monitorMetricExclusions.contains(m.getId().getName())) { m.match(response::writeMeter, response::writeMeter, response::writeTimer, response::writeDistributionSummary, response::writeLongTaskTimer, diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java index 60f48b6b292..1dbadab0317 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java @@ -25,11 +25,13 @@ import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.metrics.MonitorMeterRegistry; import org.apache.accumulo.core.spi.metrics.MeterRegistryFactory; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; @@ -67,6 +69,8 @@ public class MetricsInfoImpl implements MetricsInfo { private final List producers = new ArrayList<>(); + public static final AtomicReference MONITOR_REGISTRY = new AtomicReference<>(); + public MetricsInfoImpl(final ServerContext context) { this.context = context; metricsEnabled = context.getConfiguration().getBoolean(Property.GENERAL_MICROMETER_ENABLED); @@ -164,6 +168,9 @@ public synchronized void init(Collection tags) { } } + MONITOR_REGISTRY.set(new MonitorMeterRegistry()); + Metrics.globalRegistry.add(MONITOR_REGISTRY.get()); + // Set the MeterRegistry on the ThreadPools ThreadPools.getServerThreadPools().setMeterRegistry(Metrics.globalRegistry); diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index a83ca11c46e..a601eb8a66c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -20,7 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.RPC_POOL; import java.io.IOException; import java.net.InetAddress; @@ -99,7 +99,6 @@ public static HostAndPort[] getHostAndPorts(String hostname, IntStream ports) { * @param context RPC configuration * @param portHintProperty the port to attempt to open, can be zero, meaning "any available port" * @param processor the service to be started - * @param serverName the name of the class that is providing the service * @param minThreadProperty A Property to control the minimum number of threads in the pool * @param timeBetweenThreadChecksProperty A Property to control the amount of time between checks * to resize the thread pool @@ -107,9 +106,9 @@ public static HostAndPort[] getHostAndPorts(String hostname, IntStream ports) { * @throws UnknownHostException when we don't know our own address */ public static ServerAddress createThriftServer(ServerContext context, String hostname, - Property portHintProperty, TProcessor processor, String serverName, - Property minThreadProperty, Property threadTimeOutProperty, - Property timeBetweenThreadChecksProperty) throws UnknownHostException { + Property portHintProperty, TProcessor processor, Property minThreadProperty, + Property threadTimeOutProperty, Property timeBetweenThreadChecksProperty) + throws UnknownHostException { final AccumuloConfiguration config = context.getConfiguration(); final IntStream portHint = config.getPortStream(portHintProperty); @@ -147,7 +146,7 @@ public static ServerAddress createThriftServer(ServerContext context, String hos HostAndPort[] addresses = getHostAndPorts(hostname, portHint); try { return TServerUtils.createThriftServer(serverType, timedProcessor, context.getInstanceID(), - serverName, minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize, + minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize, context.getServerSslParams(), context.getSaslParams(), context.getClientTimeoutInMillis(), backlog, addresses); } catch (TTransportException e) { @@ -161,9 +160,9 @@ public static ServerAddress createThriftServer(ServerContext context, String hos * dynamically resize itself. */ private static ServerAddress createThreadedSelectorServer(HostAndPort address, - TProcessor processor, TProtocolFactory protocolFactory, final String serverName, - final int numThreads, final long threadTimeOut, final AccumuloConfiguration conf, - long timeBetweenThreadChecks, long maxMessageSize, int backlog) throws TTransportException { + TProcessor processor, TProtocolFactory protocolFactory, final int numThreads, + final long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks, + long maxMessageSize, int backlog) throws TTransportException { NonblockingAbstractServerSocketArgs args = new NonblockingAbstractServerSocketArgs() .backlog(backlog).bindAddr(new InetSocketAddress(address.getHost(), address.getPort())) @@ -181,8 +180,8 @@ private static ServerAddress createThreadedSelectorServer(HostAndPort address, options.stopTimeoutVal(5); // Create our own very special thread pool. - ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut, - conf, timeBetweenThreadChecks); + ThreadPoolExecutor pool = + createSelfResizingThreadPool(numThreads, threadTimeOut, conf, timeBetweenThreadChecks); options.executorService(pool); options.processorFactory(new TProcessorFactory(processor)); @@ -199,9 +198,9 @@ private static ServerAddress createThreadedSelectorServer(HostAndPort address, * dynamically resize itself. */ private static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, - TProtocolFactory protocolFactory, final String serverName, final int numThreads, - final long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks, - long maxMessageSize, int backlog) throws TTransportException { + TProtocolFactory protocolFactory, final int numThreads, final long threadTimeOut, + final AccumuloConfiguration conf, long timeBetweenThreadChecks, long maxMessageSize, + int backlog) throws TTransportException { NonblockingAbstractServerSocketArgs args = new NonblockingAbstractServerSocketArgs() .backlog(backlog).bindAddr(new InetSocketAddress(address.getHost(), address.getPort())) @@ -216,8 +215,8 @@ private static ServerAddress createNonBlockingServer(HostAndPort address, TProce options.stopTimeoutVal(5); // Create our own very special thread pool. - ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut, - conf, timeBetweenThreadChecks); + ThreadPoolExecutor pool = + createSelfResizingThreadPool(numThreads, threadTimeOut, conf, timeBetweenThreadChecks); options.executorService(pool); options.processorFactory(new TProcessorFactory(processor)); @@ -234,7 +233,6 @@ private static ServerAddress createNonBlockingServer(HostAndPort address, TProce * core pool size and number of active threads of the {@link ThreadPoolExecutor} and increase or * decrease the core pool size based on activity (excessive or lack thereof). * - * @param serverName A name to describe the thrift server this executor will service * @param executorThreads The minimum number of threads for the executor * @param threadTimeOut The time after which threads are allowed to terminate including core * threads. If set to 0, the core threads will indefinitely stay running waiting for work. @@ -243,10 +241,9 @@ private static ServerAddress createNonBlockingServer(HostAndPort address, TProce * executor thread pool * @return A {@link ThreadPoolExecutor} which will resize itself automatically */ - private static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, - final int executorThreads, long threadTimeOut, final AccumuloConfiguration conf, - long timeBetweenThreadChecks) { - String poolName = ACCUMULO_POOL_PREFIX.poolName + "." + serverName.toLowerCase() + ".client"; + private static ThreadPoolExecutor createSelfResizingThreadPool(final int executorThreads, + long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks) { + String poolName = RPC_POOL.poolName; final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().getPoolBuilder(poolName).numCoreThreads(executorThreads) .withTimeOut(threadTimeOut, MILLISECONDS).enableThreadPoolMetrics().build(); @@ -277,9 +274,9 @@ private static ThreadPoolExecutor createSelfResizingThreadPool(final String serv * @return A configured TThreadPoolServer and its bound address information */ private static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, - TProtocolFactory protocolFactory, long maxMessageSize, String serverName, int numThreads, - long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks, - int backlog) throws TTransportException { + TProtocolFactory protocolFactory, long maxMessageSize, int numThreads, long threadTimeOut, + final AccumuloConfiguration conf, long timeBetweenThreadChecks, int backlog) + throws TTransportException { InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort()); // Must use an ISA, providing only a port would ignore the hostname given @@ -287,8 +284,8 @@ private static ServerAddress createBlockingServer(HostAndPort address, TProcesso ServerSocketTransportArgs args = new ServerSocketTransportArgs().backlog(backlog).bindAddr(isa); TServerSocket transport = new TServerSocket(args); - ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut, - conf, timeBetweenThreadChecks); + ThreadPoolExecutor pool = + createSelfResizingThreadPool(numThreads, threadTimeOut, conf, timeBetweenThreadChecks); TThreadPoolServer server = createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize), protocolFactory, pool); @@ -380,7 +377,7 @@ private static TServerSocket getSslServerSocket(int port, int timeout, InetAddre */ private static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout, SslConnectionParams sslParams, - String serverName, int numThreads, long threadTimeOut, final AccumuloConfiguration conf, + int numThreads, long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks) throws TTransportException { TServerSocket transport; try { @@ -396,8 +393,8 @@ private static ServerAddress createSslThreadPoolServer(HostAndPort address, TPro log.info("SSL Thread Pool Server bound on {}", address); } - ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut, - conf, timeBetweenThreadChecks); + ThreadPoolExecutor pool = + createSelfResizingThreadPool(numThreads, threadTimeOut, conf, timeBetweenThreadChecks); return new ServerAddress(createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(), protocolFactory, pool), address); @@ -405,9 +402,8 @@ private static ServerAddress createSslThreadPoolServer(HostAndPort address, TPro private static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout, SaslServerConnectionParams params, - final String serverName, final int numThreads, final long threadTimeOut, - final AccumuloConfiguration conf, long timeBetweenThreadChecks, int backlog) - throws TTransportException { + final int numThreads, final long threadTimeOut, final AccumuloConfiguration conf, + long timeBetweenThreadChecks, int backlog) throws TTransportException { // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the // TThreadPoolServer does, // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it @@ -493,8 +489,8 @@ private static ServerAddress createSaslThreadPoolServer(HostAndPort address, TPr log.info("SASL thrift server bound on {}", address); } - ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut, - conf, timeBetweenThreadChecks); + ThreadPoolExecutor pool = + createSelfResizingThreadPool(numThreads, threadTimeOut, conf, timeBetweenThreadChecks); final TThreadPoolServer server = createTThreadPoolServer(transport, processor, ugiTransportFactory, protocolFactory, pool); @@ -519,8 +515,8 @@ public static ServerAddress createThriftServer(final AccumuloConfiguration conf, try { return createThriftServer(serverType, new TimedProcessor(processor, metricsInfo), instanceId, - serverName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, - sslParams, saslParams, serverSocketTimeout, backlog, addresses); + numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, sslParams, + saslParams, serverSocketTimeout, backlog, addresses); } catch (TTransportException e) { throw new IllegalStateException(e); } @@ -534,9 +530,9 @@ public static ServerAddress createThriftServer(final AccumuloConfiguration conf, * bound to. */ private static ServerAddress createThriftServer(ThriftServerType serverType, - TimedProcessor processor, InstanceId instanceId, String serverName, int numThreads, - long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks, - long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams, + TimedProcessor processor, InstanceId instanceId, int numThreads, long threadTimeOut, + final AccumuloConfiguration conf, long timeBetweenThreadChecks, long maxMessageSize, + SslConnectionParams sslParams, SaslServerConnectionParams saslParams, long serverSocketTimeout, int backlog, HostAndPort... addresses) throws TTransportException { TProtocolFactory protocolFactory = ThriftUtil.serverProtocolFactory(instanceId); // This is presently not supported. It's hypothetically possible, I believe, to work, but it @@ -553,29 +549,29 @@ private static ServerAddress createThriftServer(ThriftServerType serverType, case SSL -> { log.debug("Instantiating SSL Thrift server"); yield createSslThreadPoolServer(address, processor, protocolFactory, - serverSocketTimeout, sslParams, serverName, numThreads, threadTimeOut, conf, + serverSocketTimeout, sslParams, numThreads, threadTimeOut, conf, timeBetweenThreadChecks); } case SASL -> { log.debug("Instantiating SASL Thrift server"); yield createSaslThreadPoolServer(address, processor, protocolFactory, - serverSocketTimeout, saslParams, serverName, numThreads, threadTimeOut, conf, + serverSocketTimeout, saslParams, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, backlog); } case THREADPOOL -> { log.debug("Instantiating unsecure TThreadPool Thrift server"); yield createBlockingServer(address, processor, protocolFactory, maxMessageSize, - serverName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, backlog); + numThreads, threadTimeOut, conf, timeBetweenThreadChecks, backlog); } case THREADED_SELECTOR -> { log.debug("Instantiating default, unsecure Threaded selector Thrift server"); - yield createThreadedSelectorServer(address, processor, protocolFactory, serverName, - numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, backlog); + yield createThreadedSelectorServer(address, processor, protocolFactory, numThreads, + threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, backlog); } case CUSTOM_HS_HA -> { log.debug("Instantiating unsecure custom half-async Thrift server"); - yield createNonBlockingServer(address, processor, protocolFactory, serverName, - numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, backlog); + yield createNonBlockingServer(address, processor, protocolFactory, numThreads, + threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, backlog); } }; break; diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java index 21c2fcfacef..33c29715ff4 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java @@ -217,7 +217,7 @@ private ServerAddress startServer() throws Exception { String hostname = "localhost"; ServerAddress sa = TServerUtils.createThriftServer(context, hostname, Property.TSERV_CLIENTPORT, - processor, "TServerUtilsTest", Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, + processor, Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK); sa.startThriftServer("TServerUtilsTestThread"); return sa; diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index d8c2c1889d4..8b5ed2a645f 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -424,9 +424,8 @@ protected void startCompactorClientService() throws UnknownHostException { getCompactorThriftHandlerInterface(), getContext()); updateThriftServer(() -> { return TServerUtils.createThriftServer(getContext(), getBindAddress(), - Property.COMPACTOR_CLIENTPORT, processor, this.getClass().getSimpleName(), - Property.COMPACTOR_MINTHREADS, Property.COMPACTOR_MINTHREADS_TIMEOUT, - Property.COMPACTOR_THREADCHECK); + Property.COMPACTOR_CLIENTPORT, processor, Property.COMPACTOR_MINTHREADS, + Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK); }); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 9fb59deb181..aec42bf89b3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -952,7 +952,7 @@ public void run() { try { updateThriftServer(() -> { return TServerUtils.createThriftServer(context, getBindAddress(), - Property.MANAGER_CLIENTPORT, processor, "Manager", Property.MANAGER_MINTHREADS, + Property.MANAGER_CLIENTPORT, processor, Property.MANAGER_MINTHREADS, Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK); }); } catch (UnknownHostException e) { diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/views/ServersView.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/views/ServersView.java index 4705ae33d8f..49a8c313d0e 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/views/ServersView.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/views/ServersView.java @@ -26,16 +26,21 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.metrics.Metric; import org.apache.accumulo.core.metrics.Metric.MetricDocSection; +import org.apache.accumulo.core.metrics.MonitorMeterRegistry; import org.apache.accumulo.core.metrics.flatbuffers.FMetric; import org.apache.accumulo.core.metrics.flatbuffers.FTag; import org.apache.accumulo.core.process.thrift.MetricResponse; +import org.apache.accumulo.core.util.threads.ThreadPoolNames; import org.apache.accumulo.monitor.next.SystemInformation; import org.apache.accumulo.server.metrics.MetricResponseWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.github.benmanes.caffeine.cache.Cache; @@ -73,7 +78,7 @@ public record Column(String key, String label, String description, String uiClas } private record ServerMetricRow(ServerId server, MetricResponse response, - Map metrics) { + Map> metrics) { } /** @@ -102,11 +107,47 @@ public enum ServerTable { /** * Common columns that are included in every ServersView table */ - private static final List COMMON_COLUMNS = List.of( - new Column(TIME_COL_KEY, "Last Contact", - "Time since the server last responded to the monitor", "duration"), - new Column(RG_COL_KEY, "Resource Group", "Resource Group", ""), - new Column(ADDR_COL_KEY, "Server Address", "Server address", "")); + private static final List COMMON_COLUMNS = List.of(new ColumnFactory() { + private final Column column = new Column(TIME_COL_KEY, "Last Contact", + "Time since the server last responded to the monitor", "duration"); + + @Override + public Column getColumn() { + return column; + } + + @Override + public Object getRowData(ServerId sid, MetricResponse mr, + Map> serverMetrics) { + return mr == null ? null : System.currentTimeMillis() - mr.getTimestamp(); + } + }, new ColumnFactory() { + private final Column column = new Column(RG_COL_KEY, "Resource Group", "Resource Group", ""); + + @Override + public Column getColumn() { + return column; + } + + @Override + public Object getRowData(ServerId sid, MetricResponse mr, + Map> serverMetrics) { + return sid.getResourceGroup().canonical(); + } + }, new ColumnFactory() { + private final Column column = new Column(ADDR_COL_KEY, "Server Address", "Server address", ""); + + @Override + public Column getColumn() { + return column; + } + + @Override + public Object getRowData(ServerId sid, MetricResponse mr, + Map> serverMetrics) { + return sid.toHostPortString(); + } + }); public final List> data = new ArrayList<>(); public final List columns; @@ -115,14 +156,14 @@ public enum ServerTable { public ServersView(final Set servers, final long problemServerCount, final Cache allMetrics, final long timestamp, - final List requestedColumns) { + final List requestedColumns) { AtomicInteger serversMissingMetrics = new AtomicInteger(0); // Grab the current metrics for each server List serverMetricRows = servers.stream().sorted().map(serverId -> { MetricResponse metricResponse = allMetrics.getIfPresent(serverId); boolean hasMetricData = hasMetricData(metricResponse); - Map serverMetrics = + Map> serverMetrics = hasMetricData ? metricValuesByName(metricResponse) : Map.of(); if (!hasMetricData) { @@ -132,12 +173,12 @@ public ServersView(final Set servers, final long problemServerCount, return new ServerMetricRow(serverId, metricResponse, serverMetrics); }).toList(); - this.columns = requestedColumns; + this.columns = requestedColumns.stream().map(ColumnFactory::getColumn).toList(); serverMetricRows.forEach(serverMetricRow -> { Map row = new LinkedHashMap<>(); - for (Column col : columns) { - row.put(col.key(), valueForColumn(col.key(), serverMetricRow.server(), + for (ColumnFactory colf : requestedColumns) { + row.put(colf.getColumn().key(), colf.getRowData(serverMetricRow.server(), serverMetricRow.response(), serverMetricRow.metrics())); } data.add(row); @@ -146,16 +187,6 @@ public ServersView(final Set servers, final long problemServerCount, this.timestamp = timestamp; } - private static Object valueForColumn(String key, ServerId sid, MetricResponse mr, - Map serverMetrics) { - return switch (key) { - case TIME_COL_KEY -> mr == null ? null : System.currentTimeMillis() - mr.getTimestamp(); - case RG_COL_KEY -> sid.getResourceGroup().canonical(); - case ADDR_COL_KEY -> sid.toHostPortString(); - default -> serverMetrics.get(key); - }; - } - private static Status buildStatus(int serverCount, long problemServerCount, int serversMissingMetrics) { final boolean hasServers = serverCount > 0; @@ -184,28 +215,268 @@ private static boolean hasMetricData(MetricResponse mr) { return mr != null && mr.getMetrics() != null && !mr.getMetrics().isEmpty(); } + public interface ColumnFactory { + Column getColumn(); + + Object getRowData(ServerId sid, MetricResponse mr, Map> serverMetrics); + } + + private static class RatioColumnFactory implements ColumnFactory { + + private final Column column; + private final Metric numerator; + private final Metric denominator; + + RatioColumnFactory(String label, String description, Metric numerator, Metric denominator) { + this.column = new Column(numerator.getName() + "/" + denominator.getName(), label, + description, Metric.MonitorCssClass.PERCENT.getCssClass()); + this.numerator = numerator; + this.denominator = denominator; + } + + @Override + public Column getColumn() { + return column; + } + + @Override + public Object getRowData(ServerId sid, MetricResponse mr, + Map> serverMetrics) { + var n = serverMetrics.get(numerator.getName()); + var d = serverMetrics.get(denominator.getName()); + + if (n == null || d == null) { + return null; + } + + var numeratorSum = sum(n).doubleValue(); + var denominatorSum = sum(n).doubleValue(); + + if (denominatorSum == 0) { + return null; + } + + return numeratorSum / denominatorSum; + } + } + + private static class MetricColumnFactory implements ColumnFactory { + + private final Column column; + private final boolean computeRate; + + MetricColumnFactory(Metric metric) { + String classes; + if (metric.getType() == Metric.MetricType.FUNCTION_COUNTER) { + if (Arrays.asList(metric.getColumnClasses()).contains(Metric.MonitorCssClass.BYTES)) { + classes = Metric.MonitorCssClass.BYTES_RATE.getCssClass(); + } else { + classes = Metric.MonitorCssClass.RATE.getCssClass(); + } + computeRate = true; + } else { + classes = Arrays.stream(metric.getColumnClasses()).map(Metric.MonitorCssClass::getCssClass) + .collect(Collectors.joining(" ")); + computeRate = false; + } + this.column = new Column(metric.getName(), metric.getColumnHeader(), + metric.getColumnDescription(), classes); + } + + @Override + public Column getColumn() { + return column; + } + + @Override + public Object getRowData(ServerId sid, MetricResponse mr, + Map> serverMetrics) { + var sum = sum(serverMetrics.getOrDefault(column.key, List.of())); + if (computeRate) { + return computeRate(sum); + } else { + return sum; + } + } + } + + private static class ExecutorColumnFactory implements ColumnFactory { + + private static final Logger log = LoggerFactory.getLogger(ExecutorColumnFactory.class); + + private final Column column; + private final String metricName; + private final Predicate tagPredicate; + private final Type type; + + enum Type { + COMPLETED, QUEUED + } + + String getMetricName(Type t) { + return switch (t) { + case COMPLETED -> Metric.EXECUTOR_COMPLETED.getName(); + case QUEUED -> Metric.EXECUTOR_QUEUED.getName(); + }; + } + + String getCssClass(Type t) { + return switch (t) { + case COMPLETED -> Metric.MonitorCssClass.RATE.getCssClass(); + case QUEUED -> Metric.MonitorCssClass.NUMBER.getCssClass(); + }; + } + + ExecutorColumnFactory(Type type, String theadPoolPrefix, String label, String description) { + this.type = type; + this.metricName = getMetricName(type); + this.tagPredicate = s -> s.startsWith(theadPoolPrefix); + this.column = + new Column(metricName + "-" + theadPoolPrefix, label, description, getCssClass(type)); + } + + ExecutorColumnFactory(Type type, String keySuffix, Predicate tagPredicate, String label, + String description) { + this.type = type; + this.metricName = getMetricName(type); + this.tagPredicate = tagPredicate; + this.column = new Column(metricName + "-" + keySuffix, label, description, getCssClass(type)); + } + + @Override + public Column getColumn() { + return column; + } + + @Override + public Object getRowData(ServerId sid, MetricResponse mr, + Map> serverMetrics) { + + var metrics = serverMetrics.getOrDefault(metricName, List.of()); + + Number sum = null; + + FTag ftag = new FTag(); + for (var metric : metrics) { + boolean foundTag = false; + String tag = null; + for (int i = 0; i < metric.tagsLength(); i++) { + metric.tags(ftag, i); + var key = ftag.key(); + var value = ftag.value(); + if (key != null && value != null && key.equals("name") && tagPredicate.test(value)) { + foundTag = true; + tag = value; + break; + } + } + + var metricStatistic = extractStatistic(metric); + if (foundTag && (metricStatistic == null || metricStatistic.equals("value") + || metricStatistic.equals("count"))) { + var val = SystemInformation.getMetricValue(metric); + log.trace("adding {}+{} for {} {} {} {}", sum, val, metric.name(), tag, + sid.toHostPortString(), sid.getResourceGroup()); + sum = add(sum, SystemInformation.getMetricValue(metric)); + + } + } + + if (type == Type.COMPLETED) { + // Convert to a rate + return computeRate(sum); + } + + return sum; + } + + } + + private static Number computeRate(Number sum) { + if (sum == null) { + return null; + } + return sum.doubleValue() / MonitorMeterRegistry.STEP.toSeconds(); + } + /** * Builds the final ordered columns for a table. First adds the common columns, then adds the * table specific metrics. */ - public static List columnsFor(ServerTable table) { - List cols = new ArrayList<>(COMMON_COLUMNS); - cols.addAll(metricsForTable(table).stream().map(ServersView::metricColumn).toList()); + public static List columnsFor(ServerTable table) { + List cols = new ArrayList<>(COMMON_COLUMNS); + + switch (table) { + case COMPACTORS -> compactorMetrics().forEach(m -> cols.add(new MetricColumnFactory(m))); + case GC_SUMMARY -> gcSummaryMetrics().forEach(m -> cols.add(new MetricColumnFactory(m))); + case GC_FILES -> gcFileMetrics().forEach(m -> cols.add(new MetricColumnFactory(m))); + case GC_WALS -> gcWalMetrics().forEach(m -> cols.add(new MetricColumnFactory(m))); + case MANAGERS -> managerMetrics().forEach(m -> cols.add(new MetricColumnFactory(m))); + case MANAGER_FATE -> managerFateMetrics().forEach(m -> cols.add(new MetricColumnFactory(m))); + case MANAGER_COMPACTIONS -> + managerCompactionMetrics().forEach(m -> cols.add(new MetricColumnFactory(m))); + case SCAN_SERVERS -> scanServerMetrics().forEach(m -> cols.add(new MetricColumnFactory(m))); + case TABLET_SERVERS -> tabletServerColumns(cols); + } return cols; } - private static List metricsForTable(ServerTable table) { - return switch (table) { - case COMPACTORS -> compactorMetrics(); - case GC_SUMMARY -> gcSummaryMetrics(); - case GC_FILES -> gcFileMetrics(); - case GC_WALS -> gcWalMetrics(); - case MANAGERS -> managerMetrics(); - case MANAGER_FATE -> managerFateMetrics(); - case MANAGER_COMPACTIONS -> managerCompactionMetrics(); - case SCAN_SERVERS -> scanServerMetrics(); - case TABLET_SERVERS -> tabletServerMetrics(); - }; + private static void tabletServerColumns(List cols) { + cols.add(new MetricColumnFactory(Metric.SERVER_IDLE)); + cols.add(new MetricColumnFactory(Metric.LOW_MEMORY)); + + cols.add(new MetricColumnFactory(Metric.TSERVER_TABLETS_ONLINE)); + cols.add(new MetricColumnFactory(Metric.TSERVER_TABLETS_LONG_ASSIGNMENTS)); + + cols.add(new ExecutorColumnFactory(ExecutorColumnFactory.Type.COMPLETED, + ThreadPoolNames.RPC_POOL.poolName, "Completed RPCs", + "Task completed by the Thrift thread pool")); + cols.add(new ExecutorColumnFactory(ExecutorColumnFactory.Type.QUEUED, + ThreadPoolNames.RPC_POOL.poolName, "Queued RPCs", + "Task queued for the Thrift thread pool")); + + // Scan columns + cols.add(new ExecutorColumnFactory(ExecutorColumnFactory.Type.COMPLETED, + ThreadPoolNames.SCAN_EXECUTOR_PREFIX.poolName, "Completed scans", + "Scan task completed by all scan thread pools")); + cols.add(new ExecutorColumnFactory(ExecutorColumnFactory.Type.QUEUED, + ThreadPoolNames.SCAN_EXECUTOR_PREFIX.poolName, "Queued scans", + "Scan task queued on all scan thread pools")); + cols.add(new MetricColumnFactory(Metric.SCAN_ERRORS)); + cols.add(new MetricColumnFactory(Metric.SCAN_SCANNED_ENTRIES)); + cols.add(new MetricColumnFactory(Metric.SCAN_QUERY_SCAN_RESULTS)); + cols.add(new MetricColumnFactory(Metric.SCAN_QUERY_SCAN_RESULTS_BYTES)); + cols.add(new RatioColumnFactory("Index cache hit", + "Ratio of hits/total request for the index block cache", Metric.BLOCKCACHE_INDEX_HITCOUNT, + Metric.BLOCKCACHE_INDEX_REQUESTCOUNT)); + cols.add(new RatioColumnFactory("Data cache hit", + "Ratio of hits/total request for the data block cache", Metric.BLOCKCACHE_DATA_HITCOUNT, + Metric.BLOCKCACHE_DATA_REQUESTCOUNT)); + + // Ingest and minc + cols.add(new MetricColumnFactory(Metric.TSERVER_INGEST_ENTRIES)); + cols.add(new MetricColumnFactory(Metric.TSERVER_INGEST_BYTES)); + cols.add(new MetricColumnFactory(Metric.TSERVER_HOLD)); + cols.add(new ExecutorColumnFactory(ExecutorColumnFactory.Type.COMPLETED, + ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL.poolName, "Completed MinC", + "Task completed by the minor compaction thread pool")); + cols.add(new ExecutorColumnFactory(ExecutorColumnFactory.Type.QUEUED, + ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL.poolName, "Queued MinC", + "Task queued for the minor compaction thread pool")); + + // conditional update + Predicate condTagPredicate = + tag -> ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL.poolName.equals(tag) + || ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL.poolName.equals(tag) + || ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL.poolName.equals(tag); + cols.add(new ExecutorColumnFactory(ExecutorColumnFactory.Type.COMPLETED, "conditional.update", + condTagPredicate, "Completed Conditional", + "Task completed by the conditional update thread pool")); + cols.add(new ExecutorColumnFactory(ExecutorColumnFactory.Type.QUEUED, "conditional.update", + condTagPredicate, "Queued Conditional", + "Task queued for the conditional update thread pool")); + + // TODO create scan problems that is a sum of zombie and low memory } /** @@ -267,18 +538,35 @@ private static List metricList(MetricDocSection... sections) { .collect(Collectors.toList()); } - /** - * @return a Column definition converted from the given Metric - */ - private static Column metricColumn(Metric metric) { - String classes = Arrays.stream(metric.getColumnClasses()) - .map(Metric.MonitorCssClass::getCssClass).collect(Collectors.joining(" ")); - return new Column(metric.getName(), metric.getColumnHeader(), metric.getColumnDescription(), - classes); + public static Number add(Number n1, Number n2) { + if (n1 == null && n2 == null) { + return null; + } else if (n1 == null) { + return n2; + } else if (n2 == null) { + return n1; + } else if (n1 instanceof Double || n2 instanceof Double) { + return n1.doubleValue() + n2.doubleValue(); + } else if (n1 instanceof Long || n2 instanceof Long) { + return n1.longValue() + n2.longValue(); + } else if (n1 instanceof Integer || n2 instanceof Integer) { + return n1.intValue() + n2.intValue(); + } else { + throw new IllegalArgumentException( + "Unexpected value type: " + n1.getClass().getName() + " " + n2.getClass().getName()); + } } - public static Map metricValuesByName(MetricResponse response) { - var values = new HashMap(); + private static Number sum(List metrics) { + Number sum = null; + for (var metric : metrics) { + sum = add(sum, SystemInformation.getMetricValue(metric)); + } + return sum; + } + + public static Map> metricValuesByName(MetricResponse response) { + var values = new HashMap>(); if (response == null || response.getMetrics() == null || response.getMetrics().isEmpty()) { return values; } @@ -288,29 +576,16 @@ public static Map metricValuesByName(MetricResponse response) { var metricStatistic = extractStatistic(metric); if (metricStatistic == null || metricStatistic.equals("value") || metricStatistic.equals("count")) { - // For metrics with the same name, but different tags, compute a sum - Number val = SystemInformation.getMetricValue(metric); - values.compute(metric.name(), (k, v) -> { - if (v == null) { - return val; - } else if (v instanceof Integer i) { - return i + val.intValue(); - } else if (v instanceof Long l) { - return l + val.longValue(); - } else if (v instanceof Double d) { - return d + val.doubleValue(); - } else { - throw new RuntimeException("Unexpected value type: " + val.getClass()); - } - }); + values.computeIfAbsent(metric.name(), m -> new ArrayList<>()).add(metric); } } return values; } private static String extractStatistic(FMetric metric) { + FTag tag = new FTag(); for (int i = 0; i < metric.tagsLength(); i++) { - FTag tag = metric.tags(i); + tag = metric.tags(tag, i); if (MetricResponseWrapper.STATISTIC_TAG.equals(tag.key())) { return normalizeStatistic(tag.value()); } diff --git a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/server_process_common.js b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/server_process_common.js index 000f7a8a021..75b4c79b053 100644 --- a/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/server_process_common.js +++ b/server/monitor/src/main/resources/org/apache/accumulo/monitor/resources/js/server_process_common.js @@ -27,7 +27,7 @@ * This file contains methods used to display tables on the Monitor's * pages for server processes. The REST Endpoint /rest-v2/servers/view;table= * returns a data structure that has the following format: - * + * * { * "data": [ * { @@ -57,10 +57,10 @@ * }, * timestamp: long * } - * + * * The value for the 'columns' key is an array of column definitions. The value for the * 'data' key is an array of row objects keyed by the column 'key' values. - * + * * The 'columns' array is used to dynamically create table header rows in the html and * the 'data' object is directly consumed by the DataTable where each object in the 'data' * is a row in the table and each field in the object is a column. @@ -197,7 +197,7 @@ function showBannerError(banner, bannerMsg) { /** * This function refreshes the table and banner, showing an * empty table and error banner if not successful - * + * * callback - the method to use to invoke the REST API call to get the data * table - reference to HTML table object in which to create table header columns * storageKey - the session storage key for the data returned from the REST API @@ -263,6 +263,30 @@ function createDataTable(table, storageKey) { return data; } }, + { + "targets": "rate-num", + "render": function (data, type) { + if (type === 'display') { + if (data === null || data === undefined) { + return '—'; + } + data = bigNumberForQuantity(data).toString()+"/s"; + } + return data; + } + }, + { + "targets": "rate-size", + "render": function (data, type) { + if (type === 'display') { + if (data === null || data === undefined) { + return '—'; + } + data = bigNumberForSize(data).toString()+"/s"; + } + return data; + } + }, { "targets": "start-date", "render": function (data, type, row) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 7cc1e02fd7f..bcfc99369f0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -311,8 +311,8 @@ protected void startScanServerClientService() throws UnknownHostException { updateThriftServer(() -> { return TServerUtils.createThriftServer(getContext(), getBindAddress(), - Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(), - Property.SSERV_MINTHREADS, Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK); + Property.SSERV_CLIENTPORT, processor, Property.SSERV_MINTHREADS, + Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK); }); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index c91ac319c55..5b7784facf6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -410,8 +410,8 @@ AutoCloseable acquireRecoveryMemory(TabletMetadata tabletMetadata) { private void startServer(String address, TProcessor processor) throws UnknownHostException { updateThriftServer(() -> { return TServerUtils.createThriftServer(getContext(), address, Property.TSERV_CLIENTPORT, - processor, this.getClass().getSimpleName(), Property.TSERV_MINTHREADS, - Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK); + processor, Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, + Property.TSERV_THREADCHECK); }); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index deb5e711468..a904aa6cf0d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -23,9 +23,9 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toUnmodifiableMap; -import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCAN_EXECUTOR_PREFIX; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL; @@ -202,14 +202,14 @@ public ServiceEnvironment getServiceEnv() { } scanExecQueues.put(sec.name, queue); - ThreadPoolExecutor es = ThreadPools.getServerThreadPools() - .getPoolBuilder(ACCUMULO_POOL_PREFIX.poolName + ".scan." + sec.name) - .numCoreThreads(sec.getCurrentMaxThreads()).numMaxThreads(sec.getCurrentMaxThreads()) - .withTimeOut(0L, MILLISECONDS).withQueue(queue).atPriority(sec.priority) - .enableThreadPoolMetrics(enableMetrics).build(); + ThreadPoolExecutor es = + ThreadPools.getServerThreadPools().getPoolBuilder(SCAN_EXECUTOR_PREFIX.poolName + sec.name) + .numCoreThreads(sec.getCurrentMaxThreads()).numMaxThreads(sec.getCurrentMaxThreads()) + .withTimeOut(0L, MILLISECONDS).withQueue(queue).atPriority(sec.priority) + .enableThreadPoolMetrics(enableMetrics).build(); modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, - ACCUMULO_POOL_PREFIX.poolName + ".scan." + sec.name, es); + SCAN_EXECUTOR_PREFIX.poolName + sec.name, es); return es; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java index 9275f6fd326..8b8fadc60f7 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java @@ -21,6 +21,7 @@ import static org.apache.accumulo.core.metrics.Metric.SCAN_BUSY_TIMEOUT_COUNT; import static org.apache.accumulo.core.metrics.Metric.SCAN_CLOSE; import static org.apache.accumulo.core.metrics.Metric.SCAN_CONTINUE; +import static org.apache.accumulo.core.metrics.Metric.SCAN_ERRORS; import static org.apache.accumulo.core.metrics.Metric.SCAN_OPEN_FILES; import static org.apache.accumulo.core.metrics.Metric.SCAN_PAUSED_FOR_MEM; import static org.apache.accumulo.core.metrics.Metric.SCAN_QUERIES; @@ -65,6 +66,7 @@ public class TabletServerScanMetrics implements MetricsProducer { private final LongAdder queryResultCount = new LongAdder(); private final LongAdder queryResultBytes = new LongAdder(); private final LongAdder scannedCount = new LongAdder(); + private final LongAdder scanErrorCount = new LongAdder(); public void incrementLookupCount() { this.lookupCount.increment(); @@ -126,6 +128,10 @@ public long getZombieThreadsCount() { return zombieScanThreads.get(); } + public void incrementScanErrors() { + scanErrorCount.increment(); + } + public TabletServerScanMetrics(IntSupplier openFileSupplier) { openFiles = openFileSupplier; } @@ -158,14 +164,18 @@ public void registerMetrics(MeterRegistry registry) { FunctionCounter .builder(SCAN_RETURN_FOR_MEM.getName(), this.earlyReturnForMemory, AtomicLong::get) .description(SCAN_RETURN_FOR_MEM.getDescription()).register(registry); - Gauge.builder(SCAN_QUERY_SCAN_RESULTS.getName(), this.queryResultCount, LongAdder::sum) + FunctionCounter + .builder(SCAN_QUERY_SCAN_RESULTS.getName(), this.queryResultCount, LongAdder::sum) .description(SCAN_QUERY_SCAN_RESULTS.getDescription()).register(registry); - Gauge.builder(SCAN_QUERY_SCAN_RESULTS_BYTES.getName(), this.queryResultBytes, LongAdder::sum) + FunctionCounter + .builder(SCAN_QUERY_SCAN_RESULTS_BYTES.getName(), this.queryResultBytes, LongAdder::sum) .description(SCAN_QUERY_SCAN_RESULTS_BYTES.getDescription()).register(registry); Gauge .builder(SCAN_ZOMBIE_THREADS.getName(), this, TabletServerScanMetrics::getZombieThreadsCount) .description(SCAN_ZOMBIE_THREADS.getDescription()).register(registry); + FunctionCounter.builder(SCAN_ERRORS.getName(), this.scanErrorCount, LongAdder::sum) + .description(SCAN_ERRORS.getDescription()).register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java index 8b2d2c0139f..2fb384270b9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java @@ -177,12 +177,14 @@ public void run() { addResult(multiScanResult); } catch (IterationInterruptedException iie) { if (!isCancelled()) { + server.getScanMetrics().incrementScanErrors(); log.warn("Iteration interrupted, when scan not cancelled", iie); addResult(iie); } } catch (SampleNotPresentException e) { addResult(e); } catch (Exception e) { + server.getScanMetrics().incrementScanErrors(); log.warn("exception while doing multi-scan ", e); addResult(e); } finally { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java index 96251bd59b0..09fcba83299 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java @@ -89,12 +89,14 @@ public void run() { scanSession.extent.toThrift())); } catch (IterationInterruptedException iie) { if (!isCancelled()) { + server.getScanMetrics().incrementScanErrors(); log.warn("Iteration interrupted, when scan not cancelled", iie); addResult(iie); } } catch (TooManyFilesException | SampleNotPresentException e) { addResult(e); } catch (IOException | RuntimeException e) { + server.getScanMetrics().incrementScanErrors(); log.warn("exception while scanning tablet {} for {}", scanSession.extent, scanSession.client, e); addResult(e); From 493a02ecf424dc0b76112aba2bc313e2746fda63 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 23 Apr 2026 14:43:35 +0000 Subject: [PATCH 2/2] rollback bug fix fixed in another PR --- .../main/java/org/apache/accumulo/server/AbstractServer.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index 052df4d1205..7e34df0e803 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@ -99,7 +99,6 @@ public static void startServer(AbstractServer server, Logger LOG) throws Excepti private final AtomicBoolean closed = new AtomicBoolean(false); private final Set monitorMetricExclusions; - @SuppressWarnings("deprecation") protected AbstractServer(ServerId.Type serverType, ServerOpts opts, BiFunction serverContextFactory, String[] args) { @@ -107,8 +106,7 @@ protected AbstractServer(ServerId.Type serverType, ServerOpts opts, this.applicationName = serverType.name(); opts.parseArgs(applicationName, args); var siteConfig = opts.getSiteConfiguration(); - final String newBindParameter = siteConfig.get(siteConfig - .resolve(Property.RPC_PROCESS_BIND_ADDRESS, Property.GENERAL_PROCESS_BIND_ADDRESS)); + final String newBindParameter = siteConfig.get(Property.RPC_PROCESS_BIND_ADDRESS); // If new bind parameter passed on command line or in file, then use it. if (newBindParameter != null && !newBindParameter.equals(Property.RPC_PROCESS_BIND_ADDRESS.getDefaultValue())) {