From ea3b1b3a9aa25d6c70d335d1f0fde20237856176 Mon Sep 17 00:00:00 2001 From: architjainjain Date: Wed, 20 May 2026 18:01:03 +0530 Subject: [PATCH] HIVE-27126: queue level resource stats for YARN RM. --- .../hive/common/log/ProgressMonitor.java | 14 + .../org/apache/hadoop/hive/conf/HiveConf.java | 4 + .../ql/exec/tez/TezSessionPoolManager.java | 3 + .../exec/tez/monitoring/RenderStrategy.java | 36 +- .../ql/exec/tez/monitoring/TezJobMonitor.java | 10 +- .../tez/monitoring/TezProgressMonitor.java | 40 +- .../tez/monitoring/YarnMetricsHelper.java | 440 ++++++++++++++++++ .../monitoring/TestTezProgressMonitor.java | 34 ++ .../tez/monitoring/TestYarnMetricsHelper.java | 304 ++++++++++++ 9 files changed, 878 insertions(+), 7 deletions(-) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/YarnMetricsHelper.java create mode 100644 ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestYarnMetricsHelper.java diff --git a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java index 67dd7ca02f1e..507608f1e2f2 100644 --- a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java +++ b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java @@ -52,6 +52,11 @@ public String executionStatus() { public double progressedPercentage() { return 0; } + + @Override + public List getResourceLines() { + return Collections.emptyList(); + } }; List headers(); @@ -65,4 +70,13 @@ public double progressedPercentage() { String executionStatus(); double progressedPercentage(); + + /** + * Returns zero or more supplementary resource-info lines to render after the progress bar. + * Implementations that have no resource info should return an empty list (the default). + * Lines are rendered as-is by the render strategy — no additional formatting is applied. + */ + default List getResourceLines() { + return Collections.emptyList(); + } } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 31b5e32c2ddb..d1067664a620 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4859,6 +4859,10 @@ public static enum ConfVars { + " only if the execution engine is tez."), TEZ_DAG_STATUS_CHECK_INTERVAL("hive.tez.dag.status.check.interval", "500ms", new TimeValidator(TimeUnit.MILLISECONDS), "Interval between subsequent DAG status invocation."), + TEZ_YARN_METRICS_REFRESH_INTERVAL("hive.tez.yarn.metrics.refresh.interval", "5000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Interval between YARN queue/cluster resource metric refreshes shown in the Tez progress monitor. " + + "Set to 0 to disable YARN resource display."), TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f, "This is to override the tez setting with the same name"), TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MIN("hive.tez.task.scale.memory.reserve-fraction.min", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 605a92ebc8f5..33dd6217571d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.Manager; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.YarnMetricsHelper; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; @@ -397,6 +398,8 @@ public void stop() throws Exception { metrics.stop(); instance = null; + // Release the shared YarnClient used for queue resource metrics display. + YarnMetricsHelper.shutdown(); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java index 132489e4906a..48e8e1d371c7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.io.StringWriter; +import java.util.List; import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; @@ -63,7 +64,11 @@ private abstract static class BaseUpdateFunction implements UpdateFunction { @Override public void update(DAGStatus status, Map vertexProgressMap) { - renderProgress(monitor.progressMonitor(status, vertexProgressMap)); + ProgressMonitor progressMonitor = monitor.progressMonitor(status, vertexProgressMap); + renderProgress(progressMonitor); + // Render supplementary resource lines (e.g. YARN queue/cluster metrics). + // getResourceLines() returns an empty list when metrics are unavailable — no cast needed. + renderResourceLines(progressMonitor.getResourceLines()); String report = getReport(vertexProgressMap); if (showReport(report)) { renderReport(report); @@ -137,14 +142,23 @@ private String getReport(Map progressMap) { return reportBuffer.toString(); } + /** Renders the primary progress table (vertex rows + progress bar). */ abstract void renderProgress(ProgressMonitor progressMonitor); + /** + * Renders supplementary resource lines returned by + * {@link ProgressMonitor#getResourceLines()}. + * Called after {@link #renderProgress} with whatever the monitor returned — + * may be an empty list, in which case nothing should be printed. + */ + abstract void renderResourceLines(List lines); + abstract void renderReport(String report); } /** - * this adds the required progress update to the session state that is used by HS2 to send the - * same information to beeline client when requested. + * Used by HiveServer2: stores the monitor in SessionState so beeline can poll it, + * and logs resource lines to the server log. */ static class LogToFileFunction extends BaseUpdateFunction { private static final Logger LOGGER = LoggerFactory.getLogger(LogToFileFunction.class); @@ -164,6 +178,13 @@ public void renderProgress(ProgressMonitor progressMonitor) { SessionState.get().updateProgressMonitor(progressMonitor); } + @Override + public void renderResourceLines(List lines) { + if (hiveServer2InPlaceProgressEnabled) { + lines.forEach(LOGGER::info); + } + } + @Override public void renderReport(String report) { if (hiveServer2InPlaceProgressEnabled) { @@ -176,8 +197,7 @@ public void renderReport(String report) { } /** - * This used when we want the progress update to printed in the same process typically used via - * hive-cli mode. + * Used by hive-cli: renders the progress table directly to the terminal in-place. */ static class InPlaceUpdateFunction extends BaseUpdateFunction { /** @@ -196,6 +216,12 @@ public void renderProgress(ProgressMonitor progressMonitor) { inPlaceUpdate.render(progressMonitor); } + @Override + public void renderResourceLines(List lines) { + lines.forEach(line -> + InPlaceUpdate.reprintLine(SessionState.LogHelper.getInfoStream(), line)); + } + @Override public void renderReport(String report) { monitor.console.logInfo(report); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 92844f4d5716..0c774d856643 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -119,6 +119,10 @@ public static void initShutdownHook() { private final RenderStrategy.UpdateFunction updateFunction; // compile time tez counters private final TezCounters counters; + /** Live YARN metrics helper; may be null when the feature is disabled or YARN is unavailable. */ + private final YarnMetricsHelper yarnMetricsHelper; + /** YARN queue name for this query. */ + private final String yarnQueueName; public TezJobMonitor(TezSession session, List topSortedWorks, final DAGClient dagClient, HiveConf conf, DAG dag, Context ctx, final TezCounters counters, PerfLogger perfLogger) { @@ -134,6 +138,9 @@ public TezJobMonitor(TezSession session, List topSortedWorks, final DA this.counters = counters; this.shouldCollectSummaryString = conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_HISTORY_ENABLED) && conf.getBoolVar(ConfVars.HIVE_QUERY_HISTORY_EXEC_SUMMARY_ENABLED); + String queueName = session.getQueueName(); + this.yarnQueueName = (queueName != null && !queueName.isEmpty()) ? queueName : "default"; + this.yarnMetricsHelper = YarnMetricsHelper.getInstance(conf); } private RenderStrategy.UpdateFunction updateFunction() { @@ -479,6 +486,7 @@ private static boolean hasInterruptedException(Throwable e) { return false; } + /** * killRunningJobs tries to terminate execution of all * currently running tez queries. No guarantees, best effort only. @@ -516,7 +524,7 @@ public String getDiagnostics() { ProgressMonitor progressMonitor(DAGStatus status, Map progressMap) { try { return new TezProgressMonitor(dagClient, status, topSortedWorks, progressMap, console, - executionStartTime); + executionStartTime, yarnMetricsHelper, yarnQueueName); } catch (IOException | TezException e) { console.printInfo("Getting Progress Information: " + e.getMessage() + " stack trace: " + ExceptionUtils.getStackTrace(e)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java index 735442d2d1c8..ce243a87d35a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java @@ -26,6 +26,7 @@ import org.apache.tez.dag.api.client.Progress; import org.apache.tez.dag.api.client.VertexStatus; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -44,20 +45,41 @@ public class TezProgressMonitor implements ProgressMonitor { private final SessionState.LogHelper console; private final long executionStartTime; private final DAGStatus status; + /** Optional helper for fetching live YARN resource metrics; may be null if disabled. */ + @Nullable + private final YarnMetricsHelper yarnMetricsHelper; + /** YARN queue name resolved at construction time; never null. */ + private final String yarnQueueName; Map vertexStatusMap = new HashMap<>(); Map progressCountsMap = new HashMap<>(); /** * Try to get most the data required from dagClient in the constructor itself so that even after - * the tez job has finished this object can be used for later use.s + * the tez job has finished this object can be used for later use. */ TezProgressMonitor(DAGClient dagClient, DAGStatus status, List topSortedWork, Map progressMap, SessionState.LogHelper console, long executionStartTime) throws IOException, TezException { + this(dagClient, status, topSortedWork, progressMap, console, executionStartTime, + null, YarnMetricsHelper.NA); + } + + /** + * Full constructor that includes YARN metrics support. + * + * @param yarnMetricsHelper live YARN helper; may be {@code null} to disable YARN metrics. + * @param yarnQueueName name of the YARN queue used for this query. + */ + TezProgressMonitor(DAGClient dagClient, DAGStatus status, List topSortedWork, + Map progressMap, SessionState.LogHelper console, long executionStartTime, + @Nullable YarnMetricsHelper yarnMetricsHelper, String yarnQueueName) + throws IOException, TezException { this.status = status; this.topSortedWork = topSortedWork; this.console = console; this.executionStartTime = executionStartTime; + this.yarnMetricsHelper = yarnMetricsHelper; + this.yarnQueueName = yarnQueueName != null ? yarnQueueName : YarnMetricsHelper.NA; for (Map.Entry entry : progressMap.entrySet()) { String vertexName = entry.getKey(); progressCountsMap.put(vertexName, new VertexProgress(entry.getValue(), status.getState())); @@ -330,4 +352,20 @@ public int hashCode() { public DAGStatus getStatus() { return status; } + + /** + * Overrides {@link ProgressMonitor#getResourceLines()} to supply real-time YARN queue + * and cluster resource lines when a {@link YarnMetricsHelper} is available. + * + *

Called by the render strategies — no {@code instanceof} cast required there. + * Returns an empty list (no YARN lines) when metrics are unavailable or disabled. + */ + @Override + public List getResourceLines() { + if (yarnMetricsHelper == null) { + return Collections.emptyList(); + } + YarnMetricsHelper.ResourceInfo info = yarnMetricsHelper.getResourceInfo(yarnQueueName); + return Arrays.asList(info.formatQueueLine(), info.formatClusterLine()); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/YarnMetricsHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/YarnMetricsHelper.java new file mode 100644 index 000000000000..43183738894d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/YarnMetricsHelper.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueStatistics; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Process-scoped singleton helper for fetching real-time YARN resource metrics + * alongside the Tez progress monitor (HIVE-27126). + * + *

Design rationale

+ *
    + *
  • One {@link YarnClient} IPC connection is shared per distinct YARN cluster (identified + * by RM address or HA IDs), eliminating per-query connection setup overhead and + * O(N) open sockets. Multi-cluster deployments are transparently supported: each + * distinct RM address gets its own independent instance.
  • + *
  • Metrics are cached per-queue for {@code hive.tez.yarn.metrics.refresh.interval} + * (default 5 s). All queries on the same queue share one cached snapshot, so + * 100 concurrent queries on {@code root.default} still produce at most one RM + * call every 5 s — not 100.
  • + *
  • A per-queue {@link ReentrantLock} (non-blocking try-lock) prevents thundering-herd + * on cache expiry: only one thread refreshes; all others return the stale + * snapshot immediately rather than queuing behind the RM call.
  • + *
  • Instances are registered in a process-scoped registry keyed by RM address and + * destroyed by {@link #shutdown()} which should be called from + * {@code TezSessionPoolManager.stop()}.
  • + *
+ * + *

Thread-safe. {@link #getResourceInfo(String)} may be called from any number of + * concurrent query-monitor threads. + */ +public class YarnMetricsHelper implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(YarnMetricsHelper.class); + + /** Placeholder rendered when a metric cannot be obtained. */ + static final String NA = "N/A"; + + // MB → GB divisor (integer division is intentional — display only) + private static final long MB_PER_GB = 1024L; + + // ------------------------------------------------------------------------- + // Process-scoped registry — one instance per distinct RM address + // ------------------------------------------------------------------------- + + /** + * Guards the registry map. We use a single lock (not ConcurrentHashMap) because + * creation is rare and we need atomicity of "check + create". + */ + private static final Object REGISTRY_LOCK = new Object(); + + /** + * Key: canonical RM address string (or RM HA ids string) — identifies which YARN cluster. + * Value: the live helper for that cluster. + * + * In practice this map will almost always have exactly one entry (single YARN cluster). + * It supports multi-cluster deployments without any extra configuration. + */ + private static final java.util.Map REGISTRY = + new java.util.LinkedHashMap<>(); + + /** + * Derives a stable key that uniquely identifies the YARN cluster from {@code conf}. + * Uses RM HA IDs when HA is enabled, otherwise the single RM address. + */ + static String resolveRmKey(Configuration conf) { + boolean ha = conf.getBoolean(YarnConfiguration.RM_HA_ENABLED, false); + if (ha) { + String ids = conf.get(YarnConfiguration.RM_HA_IDS, ""); + return "ha:" + ids; + } + // Fall back to the IPC address (not the webapp address) — that's what YarnClient uses. + return conf.get(YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS); + } + + /** + * Returns (and lazily creates) a {@link YarnMetricsHelper} for the YARN cluster + * identified by {@code conf}. + * + *

Singleton semantics per RM: the {@code conf} supplied on the first + * call for a given RM address is used to create the {@link YarnClient}. Subsequent calls + * for the same RM address return the cached instance (conf ignored except for + * logging a warning if the refresh interval differs). Calls targeting a different + * RM address get their own independent instance — this transparently supports multi-cluster + * deployments. + * + *

Returns {@code null} when the feature is disabled + * ({@code hive.tez.yarn.metrics.refresh.interval <= 0}). + */ + public static YarnMetricsHelper getInstance(HiveConf conf) { + long intervalMs = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.TEZ_YARN_METRICS_REFRESH_INTERVAL, TimeUnit.MILLISECONDS); + if (intervalMs <= 0) { + LOG.debug("YARN resource metrics disabled (interval <= 0)"); + return null; + } + + String rmKey = resolveRmKey(conf); + + // Fast path: already initialised for this RM. + synchronized (REGISTRY_LOCK) { + YarnMetricsHelper existing = REGISTRY.get(rmKey); + if (existing != null) { + if (existing.refreshIntervalMs != intervalMs) { + LOG.warn("YarnMetricsHelper for RM '{}' already initialised with refresh interval {} ms; " + + "ignoring requested interval {} ms.", rmKey, existing.refreshIntervalMs, intervalMs); + } + return existing; + } + // Not yet created for this RM — create now (still under lock to prevent races). + try { + YarnMetricsHelper instance = new YarnMetricsHelper(createAndStartYarnClient(conf), intervalMs); + REGISTRY.put(rmKey, instance); + LOG.info("YarnMetricsHelper created for RM '{}' (refresh interval: {} ms)", rmKey, intervalMs); + return instance; + } catch (Exception e) { + LOG.warn("Could not create YarnMetricsHelper for RM '{}'; YARN resource metrics will not be shown: {}", + rmKey, e.getMessage()); + LOG.debug("YarnMetricsHelper init failure detail", e); + return null; + } + } + } + + /** + * Shuts down all registered instances and releases their underlying {@link YarnClient}s. + * Should be called once from {@code TezSessionPoolManager.stop()}. + * Safe to call multiple times. + */ + public static void shutdown() { + synchronized (REGISTRY_LOCK) { + for (java.util.Map.Entry entry : REGISTRY.entrySet()) { + try { + entry.getValue().close(); + } catch (Exception e) { + LOG.warn("Error shutting down YarnMetricsHelper for RM '{}'", entry.getKey(), e); + } + } + REGISTRY.clear(); + } + } + + // ------------------------------------------------------------------------- + // Per-queue cache + // ------------------------------------------------------------------------- + + /** + * Immutable cache entry holding the last-fetched metrics for one queue. + */ + private static final class CacheEntry { + final ResourceInfo info; + final long fetchedAtMs; + + CacheEntry(ResourceInfo info, long fetchedAtMs) { + this.info = info; + this.fetchedAtMs = fetchedAtMs; + } + } + + /** + * One lock per queue: only one thread performs an RM fetch at a time per queue. + * All other threads return the current (possibly stale) cached value immediately. + */ + private final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + private final ConcurrentHashMap queueLocks = new ConcurrentHashMap<>(); + + // ------------------------------------------------------------------------- + // Instance state + // ------------------------------------------------------------------------- + + private final YarnClient yarnClient; + private final long refreshIntervalMs; + + // ------------------------------------------------------------------------- + // ResourceInfo — immutable snapshot + // ------------------------------------------------------------------------- + + /** + * Snapshot of YARN queue + cluster metrics, held between refreshes. + * All fields are public-final for cheap read access from rendering threads. + */ + public static final class ResourceInfo { + // Queue metrics + public final String queueName; + public final String queueUsedVCores; + public final String queueCapacityVCores; + public final String queueUsedMemoryGb; + public final String queueCapacityMemoryGb; + // Cluster-wide metrics + public final String clusterUsedVCores; + public final String clusterTotalVCores; + public final String clusterUsedMemoryGb; + public final String clusterTotalMemoryGb; + + private ResourceInfo(Builder b) { + this.queueName = b.queueName; + this.queueUsedVCores = b.queueUsedVCores; + this.queueCapacityVCores = b.queueCapacityVCores; + this.queueUsedMemoryGb = b.queueUsedMemoryGb; + this.queueCapacityMemoryGb = b.queueCapacityMemoryGb; + this.clusterUsedVCores = b.clusterUsedVCores; + this.clusterTotalVCores = b.clusterTotalVCores; + this.clusterUsedMemoryGb = b.clusterUsedMemoryGb; + this.clusterTotalMemoryGb = b.clusterTotalMemoryGb; + } + + static final class Builder { + String queueName = NA; + String queueUsedVCores = NA; + String queueCapacityVCores = NA; + String queueUsedMemoryGb = NA; + String queueCapacityMemoryGb = NA; + String clusterUsedVCores = NA; + String clusterTotalVCores = NA; + String clusterUsedMemoryGb = NA; + String clusterTotalMemoryGb = NA; + + ResourceInfo build() { + return new ResourceInfo(this); + } + } + + /** + * Format the queue resource line. + * Example: + * QUEUE (root.default ): vCores used/capacity: 40 / 200 | Memory used/capacity: 80 GB / 400 GB + */ + public String formatQueueLine() { + return String.format( + "QUEUE (%-20s): vCores used/capacity: %6s / %6s | Memory used/capacity: %6s GB / %6s GB", + queueName, queueUsedVCores, queueCapacityVCores, + queueUsedMemoryGb, queueCapacityMemoryGb); + } + + /** + * Format the cluster resource line. + * Example: + * CLUSTER : vCores used/total : 320 / 1000 | Memory used/total : 640 GB / 4000 GB + */ + public String formatClusterLine() { + return String.format( + "CLUSTER : vCores used/total : %6s / %6s | Memory used/total : %6s GB / %6s GB", + clusterUsedVCores, clusterTotalVCores, + clusterUsedMemoryGb, clusterTotalMemoryGb); + } + } + + // ------------------------------------------------------------------------- + // Constructor (package-private; use getInstance() or the test constructor) + // ------------------------------------------------------------------------- + + /** Package-private for unit tests — inject a mocked YarnClient directly. */ + YarnMetricsHelper(YarnClient yarnClient, long refreshIntervalMs) { + this.yarnClient = yarnClient; + this.refreshIntervalMs = refreshIntervalMs; + } + + private static YarnClient createAndStartYarnClient(Configuration conf) { + YarnClient client = YarnClient.createYarnClient(); + client.init(conf); + client.start(); + return client; + } + + // ------------------------------------------------------------------------- + // Public API + // ------------------------------------------------------------------------- + + /** + * Returns a (possibly cached) {@link ResourceInfo} for the given queue. + * + *

Cache semantics: + *

    + *
  1. If a fresh-enough entry exists → return it immediately (no lock, no RM call).
  2. + *
  3. If the entry is stale and the per-queue lock is available → fetch fresh data, + * update the cache, return the new entry.
  4. + *
  5. If the entry is stale but another thread is already fetching (lock not available) + * → return the stale entry immediately (non-blocking). The progress monitor will + * display slightly old data for one refresh cycle rather than stalling.
  6. + *
+ * + * @param queueName YARN queue name, e.g. {@code "root.default"}. + * @return never {@code null}. + */ + public ResourceInfo getResourceInfo(String queueName) { + if (refreshIntervalMs <= 0) { + ResourceInfo.Builder b = new ResourceInfo.Builder(); + b.queueName = queueName; + return b.build(); + } + + long now = System.currentTimeMillis(); + CacheEntry current = cache.get(queueName); + + // Fast path: cache hit within interval — no locking required. + if (current != null && (now - current.fetchedAtMs) < refreshIntervalMs) { + return current.info; + } + + // Slow path: cache miss or stale. Try to acquire the per-queue lock without blocking. + ReentrantLock lock = queueLocks.computeIfAbsent(queueName, k -> new ReentrantLock()); + if (lock.tryLock()) { + try { + // Re-check after acquiring lock (another thread may have just refreshed). + current = cache.get(queueName); + if (current == null || (now - current.fetchedAtMs) >= refreshIntervalMs) { + ResourceInfo fresh = fetchFresh(queueName); + CacheEntry newEntry = new CacheEntry(fresh, System.currentTimeMillis()); + cache.put(queueName, newEntry); + return fresh; + } + return current.info; + } finally { + lock.unlock(); + } + } else { + // Another thread is fetching right now — return stale data rather than blocking. + return current != null ? current.info : new ResourceInfo.Builder().build(); + } + } + + // ------------------------------------------------------------------------- + // Internal fetch + // ------------------------------------------------------------------------- + + /** Fetches fresh metrics from the YARN RM; returns an all-N/A snapshot on failure. */ + private ResourceInfo fetchFresh(String queueName) { + ResourceInfo.Builder builder = new ResourceInfo.Builder(); + builder.queueName = queueName; + try { + populateClusterMetrics(builder); + populateQueueMetrics(builder, queueName); + } catch (YarnException | IOException e) { + LOG.warn("Failed to fetch YARN resource metrics for queue '{}': {}", queueName, + e.getMessage()); + LOG.debug("YARN metrics fetch failure detail", e); + } + return builder.build(); + } + + /** + * Populates cluster-wide vCore / memory totals via the root queue statistics. + * Both CapacityScheduler and FairScheduler expose root as the aggregate. + */ + private void populateClusterMetrics(ResourceInfo.Builder builder) + throws YarnException, IOException { + QueueInfo rootQueue = yarnClient.getQueueInfo("root"); + if (rootQueue == null) { + LOG.debug("Root queue info not available; skipping cluster metrics"); + return; + } + QueueStatistics stats = rootQueue.getQueueStatistics(); + if (stats == null) { + LOG.debug("Root queue statistics not available; skipping cluster metrics"); + return; + } + long allocated = stats.getAllocatedVCores(); + long available = stats.getAvailableVCores(); + builder.clusterUsedVCores = String.valueOf(allocated); + builder.clusterTotalVCores = String.valueOf(allocated + available); + + long allocMem = stats.getAllocatedMemoryMB(); + long availMem = stats.getAvailableMemoryMB(); + builder.clusterUsedMemoryGb = String.valueOf(allocMem / MB_PER_GB); + builder.clusterTotalMemoryGb = String.valueOf((allocMem + availMem) / MB_PER_GB); + } + + /** Populates queue-scoped vCore / memory fields. */ + private void populateQueueMetrics(ResourceInfo.Builder builder, String queueName) + throws YarnException, IOException { + QueueInfo queueInfo = yarnClient.getQueueInfo(queueName); + if (queueInfo == null) { + LOG.warn("Queue '{}' not found in YARN RM response", queueName); + return; + } + QueueStatistics stats = queueInfo.getQueueStatistics(); + if (stats == null) { + LOG.debug("Queue statistics not available for '{}'; skipping queue metrics", queueName); + return; + } + long allocated = stats.getAllocatedVCores(); + long available = stats.getAvailableVCores(); + builder.queueUsedVCores = String.valueOf(allocated); + builder.queueCapacityVCores = String.valueOf(allocated + available); + + long allocMem = stats.getAllocatedMemoryMB(); + long availMem = stats.getAvailableMemoryMB(); + builder.queueUsedMemoryGb = String.valueOf(allocMem / MB_PER_GB); + builder.queueCapacityMemoryGb = String.valueOf((allocMem + availMem) / MB_PER_GB); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + /** + * Stops the underlying YARN client. + * Do not call this directly — use {@link #shutdown()} for the singleton. + */ + @Override + public void close() { + try { + if (yarnClient != null) { + yarnClient.stop(); + } + } catch (Exception e) { + LOG.warn("Error stopping YarnClient", e); + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java index ec52f3856d51..94a0248f1737 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java @@ -32,16 +32,20 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -109,8 +113,38 @@ public void setupInternalStateOnObjectCreation() throws IOException, TezExceptio TezProgressMonitor.VertexProgress expectedReducerState = new TezProgressMonitor.VertexProgress(3, 2, 1, 0, 1, DAGStatus.State.RUNNING); assertThat(monitor.progressCountsMap.get(REDUCER), is(equalTo(expectedReducerState))); + } + + @Test + public void getResourceLinesEmptyWhenNoHelperProvided() throws IOException, TezException { + when(dagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + TezProgressMonitor monitor = + new TezProgressMonitor(dagClient, dagStatus, new ArrayList(), progressMap(), + console, Long.MAX_VALUE); + // No YarnMetricsHelper injected → getResourceLines() must return empty list (no NPE) + assertNotNull(monitor.getResourceLines()); + assertEquals(0, monitor.getResourceLines().size()); } + @Test + public void getResourceLinesDelegatesToHelper() throws IOException, TezException { + when(dagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + + YarnMetricsHelper mockHelper = mock(YarnMetricsHelper.class); + YarnMetricsHelper.ResourceInfo.Builder b = new YarnMetricsHelper.ResourceInfo.Builder(); + b.queueName = "root.default"; + YarnMetricsHelper.ResourceInfo fakeInfo = b.build(); + when(mockHelper.getResourceInfo(eq("root.default"))).thenReturn(fakeInfo); + + TezProgressMonitor monitor = + new TezProgressMonitor(dagClient, dagStatus, new ArrayList(), progressMap(), + console, Long.MAX_VALUE, mockHelper, "root.default"); + + List lines = monitor.getResourceLines(); + assertNotNull(lines); + assertEquals(2, lines.size()); // queue line + cluster line + verify(mockHelper).getResourceInfo(eq("root.default")); + } } \ No newline at end of file diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestYarnMetricsHelper.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestYarnMetricsHelper.java new file mode 100644 index 000000000000..e779b23e1b19 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestYarnMetricsHelper.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueStatistics; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +/** + * Unit tests for {@link YarnMetricsHelper} (HIVE-27126). + * Uses a mocked YarnClient — no live YARN cluster required. + * Singleton isolation is guaranteed by {@link YarnMetricsHelper#shutdown()} in @After. + */ +@RunWith(MockitoJUnitRunner.class) +public class TestYarnMetricsHelper { + + private static final String QUEUE_NAME = "root.default"; + + @Mock private YarnClient yarnClient; + @Mock private QueueInfo rootQueueInfo; + @Mock private QueueInfo queueInfo; + @Mock private QueueStatistics rootStats; + @Mock private QueueStatistics queueStats; + + @After + public void tearDown() { + YarnMetricsHelper.shutdown(); + } + + @Before + public void setUp() throws YarnException, IOException { + when(rootQueueInfo.getQueueStatistics()).thenReturn(rootStats); + when(rootStats.getAllocatedVCores()).thenReturn(320L); + when(rootStats.getAvailableVCores()).thenReturn(680L); + when(rootStats.getAllocatedMemoryMB()).thenReturn(655360L); + when(rootStats.getAvailableMemoryMB()).thenReturn(3440640L); + when(yarnClient.getQueueInfo(eq("root"))).thenReturn(rootQueueInfo); + + when(queueInfo.getQueueStatistics()).thenReturn(queueStats); + when(queueStats.getAllocatedVCores()).thenReturn(40L); + when(queueStats.getAvailableVCores()).thenReturn(160L); + when(queueStats.getAllocatedMemoryMB()).thenReturn(81920L); + when(queueStats.getAvailableMemoryMB()).thenReturn(327680L); + when(yarnClient.getQueueInfo(eq(QUEUE_NAME))).thenReturn(queueInfo); + } + + // ---- cluster metrics ---------------------------------------------------- + + @Test + public void testClusterMetrics() throws Exception { + YarnMetricsHelper h = new YarnMetricsHelper(yarnClient, 5000L); + YarnMetricsHelper.ResourceInfo info = h.getResourceInfo(QUEUE_NAME); + assertNotNull(info); + assertEquals("320", info.clusterUsedVCores); + assertEquals("1000", info.clusterTotalVCores); + assertEquals("640", info.clusterUsedMemoryGb); + assertEquals("4000", info.clusterTotalMemoryGb); + } + + // ---- queue metrics ------------------------------------------------------- + + @Test + public void testQueueMetrics() throws Exception { + YarnMetricsHelper h = new YarnMetricsHelper(yarnClient, 5000L); + YarnMetricsHelper.ResourceInfo info = h.getResourceInfo(QUEUE_NAME); + assertNotNull(info); + assertEquals(QUEUE_NAME, info.queueName); + assertEquals("40", info.queueUsedVCores); + assertEquals("200", info.queueCapacityVCores); + assertEquals("80", info.queueUsedMemoryGb); + assertEquals("400", info.queueCapacityMemoryGb); + } + + // ---- caching ------------------------------------------------------------- + + @Test + public void testResultCachedWithinInterval() throws Exception { + YarnMetricsHelper h = new YarnMetricsHelper(yarnClient, 5000L); + assertSame("second call must return cached object", + h.getResourceInfo(QUEUE_NAME), h.getResourceInfo(QUEUE_NAME)); + verify(yarnClient, atMost(1)).getQueueInfo(eq("root")); + verify(yarnClient, atMost(1)).getQueueInfo(eq(QUEUE_NAME)); + } + + @Test + public void testCacheRefreshedAfterInterval() throws Exception { + YarnMetricsHelper h = new YarnMetricsHelper(yarnClient, 1L); + h.getResourceInfo(QUEUE_NAME); + Thread.sleep(5); + h.getResourceInfo(QUEUE_NAME); + verify(yarnClient, atLeast(2)).getQueueInfo(eq(QUEUE_NAME)); + } + + @Test + public void testDifferentQueuesAreCachedIndependently() throws Exception { + when(yarnClient.getQueueInfo(eq("root.other"))).thenReturn(null); + YarnMetricsHelper h = new YarnMetricsHelper(yarnClient, 5000L); + assertEquals("40", h.getResourceInfo(QUEUE_NAME).queueUsedVCores); + assertEquals(YarnMetricsHelper.NA, h.getResourceInfo("root.other").queueUsedVCores); + } + + // ---- disabled ------------------------------------------------------------ + + @Test + public void testDisabledWhenIntervalIsZero() throws Exception { + YarnMetricsHelper h = new YarnMetricsHelper(yarnClient, 0L); + YarnMetricsHelper.ResourceInfo info = h.getResourceInfo(QUEUE_NAME); + assertEquals(YarnMetricsHelper.NA, info.clusterUsedVCores); + assertEquals(YarnMetricsHelper.NA, info.queueUsedVCores); + verify(yarnClient, atMost(0)).getQueueInfo(eq("root")); + } + + // ---- error handling ------------------------------------------------------ + + @Test + public void testAllNaWhenYarnRmThrows() throws Exception { + when(yarnClient.getQueueInfo(eq("root"))).thenThrow(new YarnException("RM unavailable")); + YarnMetricsHelper h = new YarnMetricsHelper(yarnClient, 100L); + YarnMetricsHelper.ResourceInfo info = h.getResourceInfo(QUEUE_NAME); + assertNotNull(info); + assertEquals(YarnMetricsHelper.NA, info.clusterUsedVCores); + assertEquals(YarnMetricsHelper.NA, info.queueUsedVCores); + } + + @Test + public void testNaWhenQueueNotFound() throws Exception { + when(yarnClient.getQueueInfo(eq(QUEUE_NAME))).thenReturn(null); + YarnMetricsHelper h = new YarnMetricsHelper(yarnClient, 100L); + YarnMetricsHelper.ResourceInfo info = h.getResourceInfo(QUEUE_NAME); + assertEquals("320", info.clusterUsedVCores); + assertEquals(YarnMetricsHelper.NA, info.queueUsedVCores); + } + + @Test + public void testNaWhenQueueStatsNull() throws Exception { + when(queueInfo.getQueueStatistics()).thenReturn(null); + YarnMetricsHelper h = new YarnMetricsHelper(yarnClient, 100L); + assertEquals(YarnMetricsHelper.NA, h.getResourceInfo(QUEUE_NAME).queueUsedVCores); + } + + // ---- queue name resolution ----------------------------------------------- + + // Queue name is read directly from TezSession.getQueueName() in TezJobMonitor — + // no utility method exists; these tests verify null/empty safety inline behaviour. + + @Test + public void testSessionQueueNameUsedDirectly() { + // Verifies the pattern used in TezJobMonitor constructor: + // String q = session.getQueueName(); + // yarnQueueName = (q != null && !q.isEmpty()) ? q : "default"; + org.apache.hadoop.hive.ql.exec.tez.TezSession session = + mock(org.apache.hadoop.hive.ql.exec.tez.TezSession.class); + when(session.getQueueName()).thenReturn("root.engineering"); + String q = session.getQueueName(); + assertEquals("root.engineering", (q != null && !q.isEmpty()) ? q : "default"); + } + + @Test + public void testSessionQueueNameFallsBackToDefaultWhenNull() { + org.apache.hadoop.hive.ql.exec.tez.TezSession session = + mock(org.apache.hadoop.hive.ql.exec.tez.TezSession.class); + when(session.getQueueName()).thenReturn(null); + String q = session.getQueueName(); + assertEquals("default", (q != null && !q.isEmpty()) ? q : "default"); + } + + @Test + public void testSessionQueueNameFallsBackToDefaultWhenEmpty() { + org.apache.hadoop.hive.ql.exec.tez.TezSession session = + mock(org.apache.hadoop.hive.ql.exec.tez.TezSession.class); + when(session.getQueueName()).thenReturn(""); + String q = session.getQueueName(); + assertEquals("default", (q != null && !q.isEmpty()) ? q : "default"); + } + + // ---- formatting ---------------------------------------------------------- + + @Test + public void testFormatQueueLine() throws Exception { + YarnMetricsHelper h = new YarnMetricsHelper(yarnClient, 5000L); + String line = h.getResourceInfo(QUEUE_NAME).formatQueueLine(); + assertNotNull(line); + assertEquals(true, line.contains(QUEUE_NAME)); + assertEquals(true, line.contains("40")); + assertEquals(true, line.contains("200")); + } + + @Test + public void testFormatClusterLine() throws Exception { + YarnMetricsHelper h = new YarnMetricsHelper(yarnClient, 5000L); + String line = h.getResourceInfo(QUEUE_NAME).formatClusterLine(); + assertNotNull(line); + assertEquals(true, line.contains("320")); + assertEquals(true, line.contains("1000")); + } + + // ---- singleton ----------------------------------------------------------- + + @Test + public void testGetInstanceReturnsNullWhenDisabled() { + HiveConf conf = new HiveConf(); + conf.set("hive.tez.yarn.metrics.refresh.interval", "0ms"); + assertNull(YarnMetricsHelper.getInstance(conf)); + } + + @Test + public void testShutdownIsIdempotent() { + YarnMetricsHelper.shutdown(); + YarnMetricsHelper.shutdown(); + } + + // ---- per-RM registry ----------------------------------------------------- + + @Test + public void testResolveRmKeyNonHa() { + HiveConf conf = new HiveConf(); + conf.set("yarn.resourcemanager.address", "rm1.example.com:8032"); + assertEquals("rm1.example.com:8032", YarnMetricsHelper.resolveRmKey(conf)); + } + + @Test + public void testResolveRmKeyHa() { + HiveConf conf = new HiveConf(); + conf.setBoolean("yarn.resourcemanager.ha.enabled", true); + conf.set("yarn.resourcemanager.ha.rm-ids", "rm1,rm2"); + assertEquals("ha:rm1,rm2", YarnMetricsHelper.resolveRmKey(conf)); + } + + @Test + public void testResolveRmKeyDefaultWhenNotSet() { + HiveConf conf = new HiveConf(); + // No RM address set — should return the YarnConfiguration default + String key = YarnMetricsHelper.resolveRmKey(conf); + assertNotNull(key); + // Must not be empty — we need something to key the registry on + assertEquals(false, key.isEmpty()); + } + + @Test + public void testSameRmAddressReturnsSameInstance() { + HiveConf conf1 = new HiveConf(); + conf1.set("yarn.resourcemanager.address", "rm1.example.com:8032"); + conf1.set("hive.tez.yarn.metrics.refresh.interval", "5s"); + + HiveConf conf2 = new HiveConf(); + conf2.set("yarn.resourcemanager.address", "rm1.example.com:8032"); + conf2.set("hive.tez.yarn.metrics.refresh.interval", "5s"); + + // We can't call getInstance() here without a live YarnClient being created, + // but we can verify that resolveRmKey produces the same key for both confs, + // which is the contract that drives the registry lookup. + assertEquals(YarnMetricsHelper.resolveRmKey(conf1), YarnMetricsHelper.resolveRmKey(conf2)); + } + + @Test + public void testDifferentRmAddressProducesDifferentKey() { + HiveConf conf1 = new HiveConf(); + conf1.set("yarn.resourcemanager.address", "rm1.example.com:8032"); + + HiveConf conf2 = new HiveConf(); + conf2.set("yarn.resourcemanager.address", "rm2.example.com:8032"); + + assertNotNull(YarnMetricsHelper.resolveRmKey(conf1)); + assertNotNull(YarnMetricsHelper.resolveRmKey(conf2)); + assertEquals(false, + YarnMetricsHelper.resolveRmKey(conf1).equals(YarnMetricsHelper.resolveRmKey(conf2))); + } +}