From 399c0b34c1433e10bfe09c68efc81025b8732d7d Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Wed, 17 Apr 2024 11:05:55 -0400 Subject: [PATCH 1/7] Add compaction IT that verifies queues are cleared when tablets no longer need to compact --- .../CompactionPriorityQueueMetricsIT.java | 163 +++++++++++++++++- 1 file changed, 158 insertions(+), 5 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index 800ceffbcd6..d634cd5894d 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -72,6 +72,7 @@ import org.apache.accumulo.test.functional.CompactionIT; import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; import org.apache.accumulo.test.metrics.TestStatsDSink; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -282,11 +283,8 @@ public void testQueueMetrics() throws Exception { fs.mkdirs(new Path(dir)); // Create splits so there are two groupings of tablets with similar file counts. - List splitPoints = - List.of("500", "1000", "1500", "2000", "3750", "5500", "7250", "9000"); - for (String splitPoint : splitPoints) { - addSplits(c, tableName, splitPoint); - } + String splitString = "500 1000 1500 2000 3750 5500 7250 9000"; + addSplits(c, tableName, splitString); for (int i = 0; i < 100; i++) { writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1); @@ -407,4 +405,159 @@ public void testQueueMetrics() throws Exception { shutdownTailer.set(true); thread.join(); } + + @Test + public void newTest() throws Exception { + // Metrics collector Thread + final LinkedBlockingQueue queueMetrics = new LinkedBlockingQueue<>(); + final AtomicBoolean shutdownTailer = new AtomicBoolean(false); + + Thread thread = Threads.createThread("metric-tailer", () -> { + while (!shutdownTailer.get()) { + List statsDMetrics = sink.getLines(); + for (String s : statsDMetrics) { + if (shutdownTailer.get()) { + break; + } + if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) { + queueMetrics.add(TestStatsDSink.parseStatsDMetric(s)); + } + } + } + }); + thread.start(); + + long highestFileCount = 0L; + ServerContext context = getCluster().getServerContext(); + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + String dir = getDir("/testBulkFile-"); + FileSystem fs = getCluster().getFileSystem(); + fs.mkdirs(new Path(dir)); + + // Create splits so there are two groupings of tablets with similar file counts. + String splitString = "500 1000 1500 2000 3750 5500 7250 9000"; + addSplits(c, tableName, splitString); + + for (int i = 0; i < 100; i++) { + writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1); + } + c.tableOperations().importDirectory(dir).to(tableName).load(); + + IteratorSetting iterSetting = new IteratorSetting(100, CompactionIT.TestFilter.class); + iterSetting.addOption("expectedQ", QUEUE1); + iterSetting.addOption("modulus", 3 + ""); + CompactionConfig config = + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(false); + c.tableOperations().compact(tableName, config); + + try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).build()) { + // Get each tablet's file sizes + for (TabletMetadata tablet : tm) { + long fileSize = tablet.getFiles().size(); + log.info("Number of files in tablet {}: {}", tablet.getExtent().toString(), fileSize); + highestFileCount = Math.max(highestFileCount, fileSize); + } + } + verifyData(c, tableName, 0, 100 * 100 - 1, false); + } + + boolean sawMetricsQ1 = false; + while (!sawMetricsQ1) { + while (!queueMetrics.isEmpty()) { + var qm = queueMetrics.take(); + if (qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) + && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + if (Integer.parseInt(qm.getValue()) > 0) { + sawMetricsQ1 = true; + } + } + } + // Current poll rate of the TestStatsDRegistryFactory is 3 seconds + // If metrics are not found in the queue, sleep until the next poll. + UtilWaitThread.sleep(3500); + } + + // Set lowest priority to the lowest possible system compaction priority + long lowestPriority = Short.MIN_VALUE; + long rejectedCount = 0L; + int queueSize = 0; + + boolean sawQueues = false; + // An empty queue means that the last known value is the most recent. + while (!queueMetrics.isEmpty()) { + var metric = queueMetrics.take(); + if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + rejectedCount = Long.parseLong(metric.getValue()); + } else if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + lowestPriority = Math.max(lowestPriority, Long.parseLong(metric.getValue())); + } else if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + queueSize = Integer.parseInt(metric.getValue()); + } else if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) { + sawQueues = true; + } else { + log.debug("{}", metric); + } + } + + // Confirm metrics were generated and in some cases, validate contents. + assertTrue(rejectedCount > 0L); + + // Priority is the file counts + number of compactions for that tablet. + // The lowestPriority job in the queue should have been + // at least 1 count higher than the highest file count. + short highestFileCountPrio = CompactionJobPrioritizer.createPriority( + getCluster().getServerContext().getTableId(tableName), CompactionKind.USER, + (int) highestFileCount, 0); + assertTrue(lowestPriority > highestFileCountPrio, + lowestPriority + " " + highestFileCount + " " + highestFileCountPrio); + + // Multiple Queues have been created + assertTrue(sawQueues); + + // Queue size matches the intended queue size + assertEquals(QUEUE1_SIZE, queueSize); + + // change compactor settings so that compactions no longer need to run + context.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "2000"); + context.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "2000"); + + // wait for queue to clear + Wait.waitFor(() -> { + int jobsQueued = QUEUE1_SIZE; + int queueLength = 0; + long dequeued = 0; + long rejected = 0; + while (!queueMetrics.isEmpty()) { + var metric = queueMetrics.take(); + if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + jobsQueued = Integer.parseInt(metric.getValue()); + } else if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + queueLength = Integer.parseInt(metric.getValue()); + } else if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + rejected = Long.parseLong(metric.getValue()); + } else if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + dequeued = Long.parseLong(metric.getValue()); + } + } + log.info("Queue size: {} Jobs Queued: {} Jobs Dequeued: {} Jobs Rejected: {}", queueLength, + jobsQueued, dequeued, rejected); + return jobsQueued == 0; + }, 120_000, 3500, "Queue did not clear in time"); + } + } From d7023d0ca5c5ebf8b7b0df2139ddfe60d19032a4 Mon Sep 17 00:00:00 2001 From: Dom G Date: Thu, 18 Apr 2024 11:43:39 -0400 Subject: [PATCH 2/7] Apply suggestions from code review Co-authored-by: Keith Turner --- .../test/compaction/CompactionPriorityQueueMetricsIT.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index d634cd5894d..04afe5d009c 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -447,9 +447,6 @@ public void newTest() throws Exception { IteratorSetting iterSetting = new IteratorSetting(100, CompactionIT.TestFilter.class); iterSetting.addOption("expectedQ", QUEUE1); iterSetting.addOption("modulus", 3 + ""); - CompactionConfig config = - new CompactionConfig().setIterators(List.of(iterSetting)).setWait(false); - c.tableOperations().compact(tableName, config); try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).build()) { // Get each tablet's file sizes @@ -513,7 +510,7 @@ public void newTest() throws Exception { // The lowestPriority job in the queue should have been // at least 1 count higher than the highest file count. short highestFileCountPrio = CompactionJobPrioritizer.createPriority( - getCluster().getServerContext().getTableId(tableName), CompactionKind.USER, + getCluster().getServerContext().getTableId(tableName), CompactionKind.SYSTEM, (int) highestFileCount, 0); assertTrue(lowestPriority > highestFileCountPrio, lowestPriority + " " + highestFileCount + " " + highestFileCountPrio); From 6a7ec2dd0ff88d07c1fd6bfd2525c5fb281488a9 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Thu, 18 Apr 2024 12:23:58 -0400 Subject: [PATCH 3/7] remove unneeded code copied from other test case --- .../CompactionPriorityQueueMetricsIT.java | 46 ++++--------------- 1 file changed, 9 insertions(+), 37 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index 04afe5d009c..3e672e51044 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -42,6 +42,7 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; @@ -112,7 +113,8 @@ public void setupMetricsTest() throws Exception { Map props = Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(), "table.compaction.dispatcher.opts.service", QUEUE1_SERVICE); - NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); + NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props) + .withInitialTabletAvailability(TabletAvailability.HOSTED); c.tableOperations().create(tableName, ntc); tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); @@ -408,16 +410,16 @@ public void testQueueMetrics() throws Exception { @Test public void newTest() throws Exception { - // Metrics collector Thread + + // Metrics collector Thread setup final LinkedBlockingQueue queueMetrics = new LinkedBlockingQueue<>(); final AtomicBoolean shutdownTailer = new AtomicBoolean(false); - Thread thread = Threads.createThread("metric-tailer", () -> { - while (!shutdownTailer.get()) { + while (true) { List statsDMetrics = sink.getLines(); for (String s : statsDMetrics) { if (shutdownTailer.get()) { - break; + return; } if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) { queueMetrics.add(TestStatsDSink.parseStatsDMetric(s)); @@ -427,7 +429,6 @@ public void newTest() throws Exception { }); thread.start(); - long highestFileCount = 0L; ServerContext context = getCluster().getServerContext(); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { @@ -444,16 +445,11 @@ public void newTest() throws Exception { } c.tableOperations().importDirectory(dir).to(tableName).load(); - IteratorSetting iterSetting = new IteratorSetting(100, CompactionIT.TestFilter.class); - iterSetting.addOption("expectedQ", QUEUE1); - iterSetting.addOption("modulus", 3 + ""); - try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).build()) { // Get each tablet's file sizes for (TabletMetadata tablet : tm) { long fileSize = tablet.getFiles().size(); log.info("Number of files in tablet {}: {}", tablet.getExtent().toString(), fileSize); - highestFileCount = Math.max(highestFileCount, fileSize); } } verifyData(c, tableName, 0, 100 * 100 - 1, false); @@ -475,25 +471,13 @@ public void newTest() throws Exception { UtilWaitThread.sleep(3500); } - // Set lowest priority to the lowest possible system compaction priority - long lowestPriority = Short.MIN_VALUE; - long rejectedCount = 0L; int queueSize = 0; - boolean sawQueues = false; + // An empty queue means that the last known value is the most recent. while (!queueMetrics.isEmpty()) { var metric = queueMetrics.take(); - if (metric.getName() - .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED) - && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - rejectedCount = Long.parseLong(metric.getValue()); - } else if (metric.getName() - .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY) - && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - lowestPriority = Math.max(lowestPriority, Long.parseLong(metric.getValue())); - } else if (metric.getName() - .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) + if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { queueSize = Integer.parseInt(metric.getValue()); } else if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) { @@ -503,18 +487,6 @@ public void newTest() throws Exception { } } - // Confirm metrics were generated and in some cases, validate contents. - assertTrue(rejectedCount > 0L); - - // Priority is the file counts + number of compactions for that tablet. - // The lowestPriority job in the queue should have been - // at least 1 count higher than the highest file count. - short highestFileCountPrio = CompactionJobPrioritizer.createPriority( - getCluster().getServerContext().getTableId(tableName), CompactionKind.SYSTEM, - (int) highestFileCount, 0); - assertTrue(lowestPriority > highestFileCountPrio, - lowestPriority + " " + highestFileCount + " " + highestFileCountPrio); - // Multiple Queues have been created assertTrue(sawQueues); From bd4d08ee90008a288d5b637fbe298762c201f3ef Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Thu, 18 Apr 2024 12:38:58 -0400 Subject: [PATCH 4/7] ensure compactors are stopped before each test --- .../test/compaction/CompactionPriorityQueueMetricsIT.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index 3e672e51044..96c3f803f99 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.compaction; +import static org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -107,6 +108,8 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase { @BeforeEach public void setupMetricsTest() throws Exception { + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty()); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { tableName = getUniqueNames(1)[0]; From 240c31044d330a693844562607b3d20d9daeb8ce Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Thu, 18 Apr 2024 13:18:07 -0400 Subject: [PATCH 5/7] Avoid race condition. other cleanup --- .../CompactionPriorityQueueMetricsIT.java | 59 +++++++++---------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index 96c3f803f99..b20fd5e5c43 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -411,8 +411,11 @@ public void testQueueMetrics() throws Exception { thread.join(); } + /** + * Test that the compaction queue is cleared when compactions no longer need to happen. + */ @Test - public void newTest() throws Exception { + public void testCompactionQueueClearedWhenNotNeeded() throws Exception { // Metrics collector Thread setup final LinkedBlockingQueue queueMetrics = new LinkedBlockingQueue<>(); @@ -452,49 +455,44 @@ public void newTest() throws Exception { // Get each tablet's file sizes for (TabletMetadata tablet : tm) { long fileSize = tablet.getFiles().size(); - log.info("Number of files in tablet {}: {}", tablet.getExtent().toString(), fileSize); + log.debug("Number of files in tablet {}: {}", tablet.getExtent().toString(), fileSize); } } verifyData(c, tableName, 0, 100 * 100 - 1, false); } - boolean sawMetricsQ1 = false; - while (!sawMetricsQ1) { + final int sleepMillis = 3500; // Current poll rate of the TestStatsDRegistryFactory is 3 seconds + + Wait.waitFor(() -> { while (!queueMetrics.isEmpty()) { var qm = queueMetrics.take(); if (qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) { if (Integer.parseInt(qm.getValue()) > 0) { - sawMetricsQ1 = true; + return true; } } } - // Current poll rate of the TestStatsDRegistryFactory is 3 seconds - // If metrics are not found in the queue, sleep until the next poll. - UtilWaitThread.sleep(3500); - } + return false; + }, 60_000, sleepMillis, "did not see Q1 metrics"); - int queueSize = 0; - boolean sawQueues = false; - - // An empty queue means that the last known value is the most recent. - while (!queueMetrics.isEmpty()) { - var metric = queueMetrics.take(); - if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) - && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - queueSize = Integer.parseInt(metric.getValue()); - } else if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) { - sawQueues = true; - } else { - log.debug("{}", metric); + Wait.waitFor(() -> { + int queueSize = 0; + boolean sawQueues = false; + while (!queueMetrics.isEmpty()) { + var metric = queueMetrics.take(); + if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + queueSize = Integer.parseInt(metric.getValue()); + } else if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) { + sawQueues = true; + } else { + log.debug("{}", metric); + } } - } - - // Multiple Queues have been created - assertTrue(sawQueues); - - // Queue size matches the intended queue size - assertEquals(QUEUE1_SIZE, queueSize); + return queueSize == QUEUE1_SIZE && sawQueues; + }, 60_000, sleepMillis, "did not see the expected number of queued compactions"); // change compactor settings so that compactions no longer need to run context.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "2000"); @@ -529,7 +527,8 @@ public void newTest() throws Exception { log.info("Queue size: {} Jobs Queued: {} Jobs Dequeued: {} Jobs Rejected: {}", queueLength, jobsQueued, dequeued, rejected); return jobsQueued == 0; - }, 120_000, 3500, "Queue did not clear in time"); + }, 60_000, sleepMillis, + "expected job queue to be cleared once compactions no longer need to happen"); } } From 20a77e3d99be50c1c91a96c112cfdb5d0966e38e Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Tue, 30 Apr 2024 14:24:30 -0400 Subject: [PATCH 6/7] Code review --- .../CompactionPriorityQueueMetricsIT.java | 79 ++++++++----------- .../metrics/TestStatsDRegistryFactory.java | 4 +- 2 files changed, 36 insertions(+), 47 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index b20fd5e5c43..2d44d14d5f1 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -81,6 +81,7 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -106,6 +107,11 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase { public static final String QUEUE1_SERVICE = "Q1"; public static final int QUEUE1_SIZE = 6; + // Metrics collector Thread + final LinkedBlockingQueue queueMetrics = new LinkedBlockingQueue<>(); + final AtomicBoolean shutdownTailer = new AtomicBoolean(false); + Thread metricsTailer; + @BeforeEach public void setupMetricsTest() throws Exception { getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); @@ -125,6 +131,28 @@ public void setupMetricsTest() throws Exception { fs = getCluster().getFileSystem(); rootPath = getCluster().getTemporaryPath().toString(); } + queueMetrics.clear(); + shutdownTailer.set(false); + metricsTailer = Threads.createThread("metric-tailer", () -> { + while (!shutdownTailer.get()) { + List statsDMetrics = sink.getLines(); + for (String s : statsDMetrics) { + if (shutdownTailer.get()) { + break; + } + if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) { + queueMetrics.add(TestStatsDSink.parseStatsDMetric(s)); + } + } + } + }); + metricsTailer.start(); + } + + @AfterEach + public void teardownMetricsTest() throws Exception { + shutdownTailer.set(true); + metricsTailer.join(); } private String getDir(String testName) throws Exception { @@ -260,24 +288,6 @@ private static String row(int r) { @Test public void testQueueMetrics() throws Exception { - // Metrics collector Thread - final LinkedBlockingQueue queueMetrics = new LinkedBlockingQueue<>(); - final AtomicBoolean shutdownTailer = new AtomicBoolean(false); - - Thread thread = Threads.createThread("metric-tailer", () -> { - while (!shutdownTailer.get()) { - List statsDMetrics = sink.getLines(); - for (String s : statsDMetrics) { - if (shutdownTailer.get()) { - break; - } - if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) { - queueMetrics.add(TestStatsDSink.parseStatsDMetric(s)); - } - } - } - }); - thread.start(); long highestFileCount = 0L; ServerContext context = getCluster().getServerContext(); @@ -325,9 +335,8 @@ public void testQueueMetrics() throws Exception { } } } - // Current poll rate of the TestStatsDRegistryFactory is 3 seconds // If metrics are not found in the queue, sleep until the next poll. - UtilWaitThread.sleep(3500); + UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis()); } // Set lowest priority to the lowest possible system compaction priority @@ -383,7 +392,7 @@ public void testQueueMetrics() throws Exception { boolean emptyQueue = false; // Make sure that metrics added to the queue are recent - UtilWaitThread.sleep(3500); + UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis()); while (!emptyQueue) { while (!queueMetrics.isEmpty()) { @@ -404,11 +413,8 @@ public void testQueueMetrics() throws Exception { } } } - UtilWaitThread.sleep(3500); + UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis()); } - - shutdownTailer.set(true); - thread.join(); } /** @@ -416,25 +422,6 @@ public void testQueueMetrics() throws Exception { */ @Test public void testCompactionQueueClearedWhenNotNeeded() throws Exception { - - // Metrics collector Thread setup - final LinkedBlockingQueue queueMetrics = new LinkedBlockingQueue<>(); - final AtomicBoolean shutdownTailer = new AtomicBoolean(false); - Thread thread = Threads.createThread("metric-tailer", () -> { - while (true) { - List statsDMetrics = sink.getLines(); - for (String s : statsDMetrics) { - if (shutdownTailer.get()) { - return; - } - if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) { - queueMetrics.add(TestStatsDSink.parseStatsDMetric(s)); - } - } - } - }); - thread.start(); - ServerContext context = getCluster().getServerContext(); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { @@ -461,7 +448,7 @@ public void testCompactionQueueClearedWhenNotNeeded() throws Exception { verifyData(c, tableName, 0, 100 * 100 - 1, false); } - final int sleepMillis = 3500; // Current poll rate of the TestStatsDRegistryFactory is 3 seconds + final long sleepMillis = TestStatsDRegistryFactory.pollingFrequency.toMillis(); Wait.waitFor(() -> { while (!queueMetrics.isEmpty()) { @@ -488,7 +475,7 @@ public void testCompactionQueueClearedWhenNotNeeded() throws Exception { .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) { sawQueues = true; } else { - log.debug("{}", metric); + log.debug("Other metric not used in the test: {}", metric); } } return queueSize == QUEUE1_SIZE && sawQueues; diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java b/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java index 6c26fabb4d8..7f37c6cfbcc 100644 --- a/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java @@ -37,6 +37,8 @@ public class TestStatsDRegistryFactory implements MeterRegistryFactory { public static final String SERVER_HOST = "test.meter.registry.host"; public static final String SERVER_PORT = "test.meter.registry.port"; + public static final Duration pollingFrequency = Duration.ofSeconds(3); + @Override public MeterRegistry create() { @@ -77,7 +79,7 @@ public StatsdProtocol protocol() { @Override public Duration pollingFrequency() { - return Duration.ofSeconds(3); + return pollingFrequency; } @Override From 9a63f3e643bb636afea4e64fc478fa5bcfab67b4 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Fri, 10 May 2024 11:24:02 -0400 Subject: [PATCH 7/7] Simplify test case --- .../CompactionPriorityQueueMetricsIT.java | 92 +++++-------------- 1 file changed, 25 insertions(+), 67 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index 2d44d14d5f1..7af245fde3c 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -437,85 +437,43 @@ public void testCompactionQueueClearedWhenNotNeeded() throws Exception { writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1); } c.tableOperations().importDirectory(dir).to(tableName).load(); - - try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).build()) { - // Get each tablet's file sizes - for (TabletMetadata tablet : tm) { - long fileSize = tablet.getFiles().size(); - log.debug("Number of files in tablet {}: {}", tablet.getExtent().toString(), fileSize); - } - } verifyData(c, tableName, 0, 100 * 100 - 1, false); } final long sleepMillis = TestStatsDRegistryFactory.pollingFrequency.toMillis(); - Wait.waitFor(() -> { - while (!queueMetrics.isEmpty()) { - var qm = queueMetrics.take(); - if (qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) - && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - if (Integer.parseInt(qm.getValue()) > 0) { - return true; - } - } - } - return false; - }, 60_000, sleepMillis, "did not see Q1 metrics"); - - Wait.waitFor(() -> { - int queueSize = 0; - boolean sawQueues = false; - while (!queueMetrics.isEmpty()) { - var metric = queueMetrics.take(); - if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) - && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - queueSize = Integer.parseInt(metric.getValue()); - } else if (metric.getName() - .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) { - sawQueues = true; - } else { - log.debug("Other metric not used in the test: {}", metric); - } - } - return queueSize == QUEUE1_SIZE && sawQueues; - }, 60_000, sleepMillis, "did not see the expected number of queued compactions"); + // wait for compaction jobs to be queued + Wait.waitFor(() -> getJobsQueued() > 0, 60_000, sleepMillis, + "Expected to see compaction jobs queued"); // change compactor settings so that compactions no longer need to run context.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "2000"); context.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "2000"); // wait for queue to clear - Wait.waitFor(() -> { - int jobsQueued = QUEUE1_SIZE; - int queueLength = 0; - long dequeued = 0; - long rejected = 0; - while (!queueMetrics.isEmpty()) { - var metric = queueMetrics.take(); - if (metric.getName() - .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) - && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - jobsQueued = Integer.parseInt(metric.getValue()); - } else if (metric.getName() - .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) - && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - queueLength = Integer.parseInt(metric.getValue()); - } else if (metric.getName() - .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED) - && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - rejected = Long.parseLong(metric.getValue()); - } else if (metric.getName() - .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED) - && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { - dequeued = Long.parseLong(metric.getValue()); - } + Wait.waitFor(() -> getJobsQueued() == 0, 60_000, sleepMillis, + "Expected job queue to be cleared once compactions no longer need to happen"); + } + + /** + * @return the number of jobs queued in the compaction queue. Returns -1 if no metrics are found. + */ + private int getJobsQueued() throws InterruptedException { + Integer jobsQueued = null; + while (!queueMetrics.isEmpty()) { + var metric = queueMetrics.take(); + if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + jobsQueued = Integer.parseInt(metric.getValue()); } - log.info("Queue size: {} Jobs Queued: {} Jobs Dequeued: {} Jobs Rejected: {}", queueLength, - jobsQueued, dequeued, rejected); - return jobsQueued == 0; - }, 60_000, sleepMillis, - "expected job queue to be cleared once compactions no longer need to happen"); + } + if (jobsQueued == null) { + log.warn("No compaction job queue metrics found."); + return -1; + } + log.info("Jobs Queued: {}", jobsQueued); + return jobsQueued; } }