Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compaction IT that verifies queues are cleared when tablets no longer need to compact #4466

Merged
merged 10 commits into from
May 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.accumulo.test.compaction;

import static org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -42,6 +43,7 @@
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
Expand Down Expand Up @@ -72,12 +74,14 @@
import org.apache.accumulo.test.functional.CompactionIT;
import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
import org.apache.accumulo.test.metrics.TestStatsDSink;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -103,22 +107,52 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase {
public static final String QUEUE1_SERVICE = "Q1";
public static final int QUEUE1_SIZE = 6;

// Metrics collector Thread
final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new LinkedBlockingQueue<>();
final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
Thread metricsTailer;

@BeforeEach
public void setupMetricsTest() throws Exception {
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty());
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
tableName = getUniqueNames(1)[0];

Map<String,String> props =
Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(),
"table.compaction.dispatcher.opts.service", QUEUE1_SERVICE);
NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props);
NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props)
.withInitialTabletAvailability(TabletAvailability.HOSTED);
c.tableOperations().create(tableName, ntc);

tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName));
aconf = getCluster().getServerContext().getConfiguration();
fs = getCluster().getFileSystem();
rootPath = getCluster().getTemporaryPath().toString();
}
queueMetrics.clear();
shutdownTailer.set(false);
metricsTailer = Threads.createThread("metric-tailer", () -> {
while (!shutdownTailer.get()) {
List<String> statsDMetrics = sink.getLines();
for (String s : statsDMetrics) {
if (shutdownTailer.get()) {
break;
}
if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) {
queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
}
}
}
});
metricsTailer.start();
}

@AfterEach
public void teardownMetricsTest() throws Exception {
shutdownTailer.set(true);
metricsTailer.join();
}

private String getDir(String testName) throws Exception {
Expand Down Expand Up @@ -254,24 +288,6 @@ private static String row(int r) {

@Test
public void testQueueMetrics() throws Exception {
// Metrics collector Thread
final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new LinkedBlockingQueue<>();
final AtomicBoolean shutdownTailer = new AtomicBoolean(false);

Thread thread = Threads.createThread("metric-tailer", () -> {
while (!shutdownTailer.get()) {
List<String> statsDMetrics = sink.getLines();
for (String s : statsDMetrics) {
if (shutdownTailer.get()) {
break;
}
if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) {
queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
}
}
}
});
thread.start();

long highestFileCount = 0L;
ServerContext context = getCluster().getServerContext();
Expand All @@ -282,11 +298,8 @@ public void testQueueMetrics() throws Exception {
fs.mkdirs(new Path(dir));

// Create splits so there are two groupings of tablets with similar file counts.
List<String> splitPoints =
List.of("500", "1000", "1500", "2000", "3750", "5500", "7250", "9000");
for (String splitPoint : splitPoints) {
addSplits(c, tableName, splitPoint);
}
String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
addSplits(c, tableName, splitString);

for (int i = 0; i < 100; i++) {
writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
Expand Down Expand Up @@ -322,9 +335,8 @@ public void testQueueMetrics() throws Exception {
}
}
}
// Current poll rate of the TestStatsDRegistryFactory is 3 seconds
// If metrics are not found in the queue, sleep until the next poll.
UtilWaitThread.sleep(3500);
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
}

// Set lowest priority to the lowest possible system compaction priority
Expand Down Expand Up @@ -380,7 +392,7 @@ public void testQueueMetrics() throws Exception {
boolean emptyQueue = false;

// Make sure that metrics added to the queue are recent
UtilWaitThread.sleep(3500);
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());

while (!emptyQueue) {
while (!queueMetrics.isEmpty()) {
Expand All @@ -401,10 +413,67 @@ public void testQueueMetrics() throws Exception {
}
}
}
UtilWaitThread.sleep(3500);
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
}
}

/**
* Test that the compaction queue is cleared when compactions no longer need to happen.
*/
@Test
public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
ServerContext context = getCluster().getServerContext();
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {

String dir = getDir("/testBulkFile-");
FileSystem fs = getCluster().getFileSystem();
fs.mkdirs(new Path(dir));

// Create splits so there are two groupings of tablets with similar file counts.
String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
addSplits(c, tableName, splitString);

for (int i = 0; i < 100; i++) {
writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
}
c.tableOperations().importDirectory(dir).to(tableName).load();
verifyData(c, tableName, 0, 100 * 100 - 1, false);
}

shutdownTailer.set(true);
thread.join();
final long sleepMillis = TestStatsDRegistryFactory.pollingFrequency.toMillis();

// wait for compaction jobs to be queued
Wait.waitFor(() -> getJobsQueued() > 0, 60_000, sleepMillis,
"Expected to see compaction jobs queued");

// change compactor settings so that compactions no longer need to run
context.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "2000");
context.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "2000");

// wait for queue to clear
Wait.waitFor(() -> getJobsQueued() == 0, 60_000, sleepMillis,
"Expected job queue to be cleared once compactions no longer need to happen");
}

/**
* @return the number of jobs queued in the compaction queue. Returns -1 if no metrics are found.
*/
private int getJobsQueued() throws InterruptedException {
DomGarguilo marked this conversation as resolved.
Show resolved Hide resolved
Integer jobsQueued = null;
while (!queueMetrics.isEmpty()) {
var metric = queueMetrics.take();
if (metric.getName()
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
jobsQueued = Integer.parseInt(metric.getValue());
}
}
if (jobsQueued == null) {
log.warn("No compaction job queue metrics found.");
return -1;
}
log.info("Jobs Queued: {}", jobsQueued);
return jobsQueued;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class TestStatsDRegistryFactory implements MeterRegistryFactory {
public static final String SERVER_HOST = "test.meter.registry.host";
public static final String SERVER_PORT = "test.meter.registry.port";

public static final Duration pollingFrequency = Duration.ofSeconds(3);

@Override
public MeterRegistry create(final InitParameters params) {
LOG.info("starting metrics registration.");
Expand Down Expand Up @@ -77,7 +79,7 @@ public StatsdProtocol protocol() {

@Override
public Duration pollingFrequency() {
return Duration.ofSeconds(3);
return pollingFrequency;
}

@Override
Expand Down