diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index 800ceffbcd6..7af245fde3c 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.compaction; +import static org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -42,6 +43,7 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; @@ -72,12 +74,14 @@ import org.apache.accumulo.test.functional.CompactionIT; import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; import org.apache.accumulo.test.metrics.TestStatsDSink; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -103,15 +107,23 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase { public static final String QUEUE1_SERVICE = "Q1"; public static final int QUEUE1_SIZE = 6; + // Metrics collector Thread + final LinkedBlockingQueue queueMetrics = new LinkedBlockingQueue<>(); + final AtomicBoolean shutdownTailer = new AtomicBoolean(false); + Thread metricsTailer; + @BeforeEach public void setupMetricsTest() throws Exception { + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty()); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { tableName = getUniqueNames(1)[0]; Map props = Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(), "table.compaction.dispatcher.opts.service", QUEUE1_SERVICE); - NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); + NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props) + .withInitialTabletAvailability(TabletAvailability.HOSTED); c.tableOperations().create(tableName, ntc); tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); @@ -119,6 +131,28 @@ public void setupMetricsTest() throws Exception { fs = getCluster().getFileSystem(); rootPath = getCluster().getTemporaryPath().toString(); } + queueMetrics.clear(); + shutdownTailer.set(false); + metricsTailer = Threads.createThread("metric-tailer", () -> { + while (!shutdownTailer.get()) { + List statsDMetrics = sink.getLines(); + for (String s : statsDMetrics) { + if (shutdownTailer.get()) { + break; + } + if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) { + queueMetrics.add(TestStatsDSink.parseStatsDMetric(s)); + } + } + } + }); + metricsTailer.start(); + } + + @AfterEach + public void teardownMetricsTest() throws Exception { + shutdownTailer.set(true); + metricsTailer.join(); } private String getDir(String testName) throws Exception { @@ -254,24 +288,6 @@ private static String row(int r) { @Test public void testQueueMetrics() throws Exception { - // Metrics collector Thread - final LinkedBlockingQueue queueMetrics = new LinkedBlockingQueue<>(); - final AtomicBoolean shutdownTailer = new AtomicBoolean(false); - - Thread thread = Threads.createThread("metric-tailer", () -> { - while (!shutdownTailer.get()) { - List statsDMetrics = sink.getLines(); - for (String s : statsDMetrics) { - if (shutdownTailer.get()) { - break; - } - if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) { - queueMetrics.add(TestStatsDSink.parseStatsDMetric(s)); - } - } - } - }); - thread.start(); long highestFileCount = 0L; ServerContext context = getCluster().getServerContext(); @@ -282,11 +298,8 @@ public void testQueueMetrics() throws Exception { fs.mkdirs(new Path(dir)); // Create splits so there are two groupings of tablets with similar file counts. - List splitPoints = - List.of("500", "1000", "1500", "2000", "3750", "5500", "7250", "9000"); - for (String splitPoint : splitPoints) { - addSplits(c, tableName, splitPoint); - } + String splitString = "500 1000 1500 2000 3750 5500 7250 9000"; + addSplits(c, tableName, splitString); for (int i = 0; i < 100; i++) { writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1); @@ -322,9 +335,8 @@ public void testQueueMetrics() throws Exception { } } } - // Current poll rate of the TestStatsDRegistryFactory is 3 seconds // If metrics are not found in the queue, sleep until the next poll. - UtilWaitThread.sleep(3500); + UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis()); } // Set lowest priority to the lowest possible system compaction priority @@ -380,7 +392,7 @@ public void testQueueMetrics() throws Exception { boolean emptyQueue = false; // Make sure that metrics added to the queue are recent - UtilWaitThread.sleep(3500); + UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis()); while (!emptyQueue) { while (!queueMetrics.isEmpty()) { @@ -401,10 +413,67 @@ public void testQueueMetrics() throws Exception { } } } - UtilWaitThread.sleep(3500); + UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis()); + } + } + + /** + * Test that the compaction queue is cleared when compactions no longer need to happen. + */ + @Test + public void testCompactionQueueClearedWhenNotNeeded() throws Exception { + ServerContext context = getCluster().getServerContext(); + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + String dir = getDir("/testBulkFile-"); + FileSystem fs = getCluster().getFileSystem(); + fs.mkdirs(new Path(dir)); + + // Create splits so there are two groupings of tablets with similar file counts. + String splitString = "500 1000 1500 2000 3750 5500 7250 9000"; + addSplits(c, tableName, splitString); + + for (int i = 0; i < 100; i++) { + writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1); + } + c.tableOperations().importDirectory(dir).to(tableName).load(); + verifyData(c, tableName, 0, 100 * 100 - 1, false); } - shutdownTailer.set(true); - thread.join(); + final long sleepMillis = TestStatsDRegistryFactory.pollingFrequency.toMillis(); + + // wait for compaction jobs to be queued + Wait.waitFor(() -> getJobsQueued() > 0, 60_000, sleepMillis, + "Expected to see compaction jobs queued"); + + // change compactor settings so that compactions no longer need to run + context.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "2000"); + context.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "2000"); + + // wait for queue to clear + Wait.waitFor(() -> getJobsQueued() == 0, 60_000, sleepMillis, + "Expected job queue to be cleared once compactions no longer need to happen"); } + + /** + * @return the number of jobs queued in the compaction queue. Returns -1 if no metrics are found. + */ + private int getJobsQueued() throws InterruptedException { + Integer jobsQueued = null; + while (!queueMetrics.isEmpty()) { + var metric = queueMetrics.take(); + if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + jobsQueued = Integer.parseInt(metric.getValue()); + } + } + if (jobsQueued == null) { + log.warn("No compaction job queue metrics found."); + return -1; + } + log.info("Jobs Queued: {}", jobsQueued); + return jobsQueued; + } + } diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java b/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java index 8715a40c00d..4c2261d6a2c 100644 --- a/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java @@ -37,6 +37,8 @@ public class TestStatsDRegistryFactory implements MeterRegistryFactory { public static final String SERVER_HOST = "test.meter.registry.host"; public static final String SERVER_PORT = "test.meter.registry.port"; + public static final Duration pollingFrequency = Duration.ofSeconds(3); + @Override public MeterRegistry create(final InitParameters params) { LOG.info("starting metrics registration."); @@ -77,7 +79,7 @@ public StatsdProtocol protocol() { @Override public Duration pollingFrequency() { - return Duration.ofSeconds(3); + return pollingFrequency; } @Override