Skip to content

Commit

Permalink
Add compaction IT that verifies queues are cleared when tablets no lo…
Browse files Browse the repository at this point in the history
…nger need to compact (#4466)

* Add compaction IT that verifies queues are cleared when tablets no longer need to compact
  • Loading branch information
DomGarguilo authored May 15, 2024
1 parent 39c9271 commit ff951d7
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.accumulo.test.compaction;

import static org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -42,6 +43,7 @@
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
Expand Down Expand Up @@ -72,12 +74,14 @@
import org.apache.accumulo.test.functional.CompactionIT;
import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
import org.apache.accumulo.test.metrics.TestStatsDSink;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -103,22 +107,52 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase {
public static final String QUEUE1_SERVICE = "Q1";
public static final int QUEUE1_SIZE = 6;

// Metrics collector Thread
final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new LinkedBlockingQueue<>();
final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
Thread metricsTailer;

@BeforeEach
public void setupMetricsTest() throws Exception {
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty());
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
tableName = getUniqueNames(1)[0];

Map<String,String> props =
Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(),
"table.compaction.dispatcher.opts.service", QUEUE1_SERVICE);
NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props);
NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props)
.withInitialTabletAvailability(TabletAvailability.HOSTED);
c.tableOperations().create(tableName, ntc);

tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName));
aconf = getCluster().getServerContext().getConfiguration();
fs = getCluster().getFileSystem();
rootPath = getCluster().getTemporaryPath().toString();
}
queueMetrics.clear();
shutdownTailer.set(false);
metricsTailer = Threads.createThread("metric-tailer", () -> {
while (!shutdownTailer.get()) {
List<String> statsDMetrics = sink.getLines();
for (String s : statsDMetrics) {
if (shutdownTailer.get()) {
break;
}
if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) {
queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
}
}
}
});
metricsTailer.start();
}

@AfterEach
public void teardownMetricsTest() throws Exception {
shutdownTailer.set(true);
metricsTailer.join();
}

private String getDir(String testName) throws Exception {
Expand Down Expand Up @@ -254,24 +288,6 @@ private static String row(int r) {

@Test
public void testQueueMetrics() throws Exception {
// Metrics collector Thread
final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new LinkedBlockingQueue<>();
final AtomicBoolean shutdownTailer = new AtomicBoolean(false);

Thread thread = Threads.createThread("metric-tailer", () -> {
while (!shutdownTailer.get()) {
List<String> statsDMetrics = sink.getLines();
for (String s : statsDMetrics) {
if (shutdownTailer.get()) {
break;
}
if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) {
queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
}
}
}
});
thread.start();

long highestFileCount = 0L;
ServerContext context = getCluster().getServerContext();
Expand All @@ -282,11 +298,8 @@ public void testQueueMetrics() throws Exception {
fs.mkdirs(new Path(dir));

// Create splits so there are two groupings of tablets with similar file counts.
List<String> splitPoints =
List.of("500", "1000", "1500", "2000", "3750", "5500", "7250", "9000");
for (String splitPoint : splitPoints) {
addSplits(c, tableName, splitPoint);
}
String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
addSplits(c, tableName, splitString);

for (int i = 0; i < 100; i++) {
writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
Expand Down Expand Up @@ -322,9 +335,8 @@ public void testQueueMetrics() throws Exception {
}
}
}
// Current poll rate of the TestStatsDRegistryFactory is 3 seconds
// If metrics are not found in the queue, sleep until the next poll.
UtilWaitThread.sleep(3500);
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
}

// Set lowest priority to the lowest possible system compaction priority
Expand Down Expand Up @@ -380,7 +392,7 @@ public void testQueueMetrics() throws Exception {
boolean emptyQueue = false;

// Make sure that metrics added to the queue are recent
UtilWaitThread.sleep(3500);
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());

while (!emptyQueue) {
while (!queueMetrics.isEmpty()) {
Expand All @@ -401,10 +413,67 @@ public void testQueueMetrics() throws Exception {
}
}
}
UtilWaitThread.sleep(3500);
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
}
}

/**
* Test that the compaction queue is cleared when compactions no longer need to happen.
*/
@Test
public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
ServerContext context = getCluster().getServerContext();
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {

String dir = getDir("/testBulkFile-");
FileSystem fs = getCluster().getFileSystem();
fs.mkdirs(new Path(dir));

// Create splits so there are two groupings of tablets with similar file counts.
String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
addSplits(c, tableName, splitString);

for (int i = 0; i < 100; i++) {
writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
}
c.tableOperations().importDirectory(dir).to(tableName).load();
verifyData(c, tableName, 0, 100 * 100 - 1, false);
}

shutdownTailer.set(true);
thread.join();
final long sleepMillis = TestStatsDRegistryFactory.pollingFrequency.toMillis();

// wait for compaction jobs to be queued
Wait.waitFor(() -> getJobsQueued() > 0, 60_000, sleepMillis,
"Expected to see compaction jobs queued");

// change compactor settings so that compactions no longer need to run
context.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "2000");
context.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "2000");

// wait for queue to clear
Wait.waitFor(() -> getJobsQueued() == 0, 60_000, sleepMillis,
"Expected job queue to be cleared once compactions no longer need to happen");
}

/**
* @return the number of jobs queued in the compaction queue. Returns -1 if no metrics are found.
*/
private int getJobsQueued() throws InterruptedException {
Integer jobsQueued = null;
while (!queueMetrics.isEmpty()) {
var metric = queueMetrics.take();
if (metric.getName()
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
jobsQueued = Integer.parseInt(metric.getValue());
}
}
if (jobsQueued == null) {
log.warn("No compaction job queue metrics found.");
return -1;
}
log.info("Jobs Queued: {}", jobsQueued);
return jobsQueued;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class TestStatsDRegistryFactory implements MeterRegistryFactory {
public static final String SERVER_HOST = "test.meter.registry.host";
public static final String SERVER_PORT = "test.meter.registry.port";

public static final Duration pollingFrequency = Duration.ofSeconds(3);

@Override
public MeterRegistry create(final InitParameters params) {
LOG.info("starting metrics registration.");
Expand Down Expand Up @@ -77,7 +79,7 @@ public StatsdProtocol protocol() {

@Override
public Duration pollingFrequency() {
return Duration.ofSeconds(3);
return pollingFrequency;
}

@Override
Expand Down

0 comments on commit ff951d7

Please sign in to comment.