Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compaction IT that verifies queues are cleared when tablets no longer need to compact #4466

Merged
merged 10 commits into from
May 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.accumulo.test.compaction;

import static org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -42,6 +43,7 @@
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
Expand Down Expand Up @@ -72,12 +74,14 @@
import org.apache.accumulo.test.functional.CompactionIT;
import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
import org.apache.accumulo.test.metrics.TestStatsDSink;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -103,22 +107,52 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase {
public static final String QUEUE1_SERVICE = "Q1";
public static final int QUEUE1_SIZE = 6;

// Metrics collector Thread
final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new LinkedBlockingQueue<>();
final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
Thread metricsTailer;

@BeforeEach
public void setupMetricsTest() throws Exception {
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty());
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
tableName = getUniqueNames(1)[0];

Map<String,String> props =
Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(),
"table.compaction.dispatcher.opts.service", QUEUE1_SERVICE);
NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props);
NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props)
.withInitialTabletAvailability(TabletAvailability.HOSTED);
c.tableOperations().create(tableName, ntc);

tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName));
aconf = getCluster().getServerContext().getConfiguration();
fs = getCluster().getFileSystem();
rootPath = getCluster().getTemporaryPath().toString();
}
queueMetrics.clear();
shutdownTailer.set(false);
metricsTailer = Threads.createThread("metric-tailer", () -> {
while (!shutdownTailer.get()) {
List<String> statsDMetrics = sink.getLines();
for (String s : statsDMetrics) {
if (shutdownTailer.get()) {
break;
}
if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) {
queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
}
}
}
});
metricsTailer.start();
}

@AfterEach
public void teardownMetricsTest() throws Exception {
shutdownTailer.set(true);
metricsTailer.join();
}

private String getDir(String testName) throws Exception {
Expand Down Expand Up @@ -254,24 +288,6 @@ private static String row(int r) {

@Test
public void testQueueMetrics() throws Exception {
// Metrics collector Thread
final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new LinkedBlockingQueue<>();
final AtomicBoolean shutdownTailer = new AtomicBoolean(false);

Thread thread = Threads.createThread("metric-tailer", () -> {
while (!shutdownTailer.get()) {
List<String> statsDMetrics = sink.getLines();
for (String s : statsDMetrics) {
if (shutdownTailer.get()) {
break;
}
if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) {
queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
}
}
}
});
thread.start();

long highestFileCount = 0L;
ServerContext context = getCluster().getServerContext();
Expand All @@ -282,11 +298,8 @@ public void testQueueMetrics() throws Exception {
fs.mkdirs(new Path(dir));

// Create splits so there are two groupings of tablets with similar file counts.
List<String> splitPoints =
List.of("500", "1000", "1500", "2000", "3750", "5500", "7250", "9000");
for (String splitPoint : splitPoints) {
addSplits(c, tableName, splitPoint);
}
String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
addSplits(c, tableName, splitString);

for (int i = 0; i < 100; i++) {
writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
Expand Down Expand Up @@ -322,9 +335,8 @@ public void testQueueMetrics() throws Exception {
}
}
}
// Current poll rate of the TestStatsDRegistryFactory is 3 seconds
// If metrics are not found in the queue, sleep until the next poll.
UtilWaitThread.sleep(3500);
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
}

// Set lowest priority to the lowest possible system compaction priority
Expand Down Expand Up @@ -380,7 +392,7 @@ public void testQueueMetrics() throws Exception {
boolean emptyQueue = false;

// Make sure that metrics added to the queue are recent
UtilWaitThread.sleep(3500);
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());

while (!emptyQueue) {
while (!queueMetrics.isEmpty()) {
Expand All @@ -401,10 +413,109 @@ public void testQueueMetrics() throws Exception {
}
}
}
UtilWaitThread.sleep(3500);
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
}
}

shutdownTailer.set(true);
thread.join();
/**
* Test that the compaction queue is cleared when compactions no longer need to happen.
*/
@Test
public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
ServerContext context = getCluster().getServerContext();
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {

String dir = getDir("/testBulkFile-");
FileSystem fs = getCluster().getFileSystem();
fs.mkdirs(new Path(dir));

// Create splits so there are two groupings of tablets with similar file counts.
String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
addSplits(c, tableName, splitString);

for (int i = 0; i < 100; i++) {
writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
}
c.tableOperations().importDirectory(dir).to(tableName).load();

try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).build()) {
// Get each tablet's file sizes
for (TabletMetadata tablet : tm) {
long fileSize = tablet.getFiles().size();
log.debug("Number of files in tablet {}: {}", tablet.getExtent().toString(), fileSize);
}
}
verifyData(c, tableName, 0, 100 * 100 - 1, false);
}

final long sleepMillis = TestStatsDRegistryFactory.pollingFrequency.toMillis();

Wait.waitFor(() -> {
while (!queueMetrics.isEmpty()) {
var qm = queueMetrics.take();
if (qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
&& qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
if (Integer.parseInt(qm.getValue()) > 0) {
return true;
}
}
}
return false;
}, 60_000, sleepMillis, "did not see Q1 metrics");

Wait.waitFor(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this wait loop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized I pulled in a lot of unneeded stuff from the other test case while developing the changes for this PR. In 9a63f3e I removed a lot so that its easier to track whats going on and only kept whats necessary to test what we are trying to test here.

int queueSize = 0;
boolean sawQueues = false;
while (!queueMetrics.isEmpty()) {
var metric = queueMetrics.take();
if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
queueSize = Integer.parseInt(metric.getValue());
} else if (metric.getName()
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) {
sawQueues = true;
} else {
log.debug("Other metric not used in the test: {}", metric);
}
}
return queueSize == QUEUE1_SIZE && sawQueues;
}, 60_000, sleepMillis, "did not see the expected number of queued compactions");

// change compactor settings so that compactions no longer need to run
context.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "2000");
context.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "2000");

// wait for queue to clear
Wait.waitFor(() -> {
int jobsQueued = QUEUE1_SIZE;
int queueLength = 0;
long dequeued = 0;
long rejected = 0;
while (!queueMetrics.isEmpty()) {
var metric = queueMetrics.take();
if (metric.getName()
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
jobsQueued = Integer.parseInt(metric.getValue());
} else if (metric.getName()
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
queueLength = Integer.parseInt(metric.getValue());
} else if (metric.getName()
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED)
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
rejected = Long.parseLong(metric.getValue());
} else if (metric.getName()
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED)
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
dequeued = Long.parseLong(metric.getValue());
}
}
log.info("Queue size: {} Jobs Queued: {} Jobs Dequeued: {} Jobs Rejected: {}", queueLength,
jobsQueued, dequeued, rejected);
return jobsQueued == 0;
}, 60_000, sleepMillis,
"expected job queue to be cleared once compactions no longer need to happen");
DomGarguilo marked this conversation as resolved.
Show resolved Hide resolved
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class TestStatsDRegistryFactory implements MeterRegistryFactory {
public static final String SERVER_HOST = "test.meter.registry.host";
public static final String SERVER_PORT = "test.meter.registry.port";

public static final Duration pollingFrequency = Duration.ofSeconds(3);

@Override
public MeterRegistry create(final InitParameters params) {
LOG.info("starting metrics registration.");
Expand Down Expand Up @@ -77,7 +79,7 @@ public StatsdProtocol protocol() {

@Override
public Duration pollingFrequency() {
return Duration.ofSeconds(3);
return pollingFrequency;
}

@Override
Expand Down