Skip to content

Commit

Permalink
Simplify test case
Browse files Browse the repository at this point in the history
  • Loading branch information
DomGarguilo committed May 10, 2024
1 parent 7be2494 commit 9a63f3e
Showing 1 changed file with 25 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,85 +437,43 @@ public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
}
c.tableOperations().importDirectory(dir).to(tableName).load();

try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).build()) {
// Get each tablet's file sizes
for (TabletMetadata tablet : tm) {
long fileSize = tablet.getFiles().size();
log.debug("Number of files in tablet {}: {}", tablet.getExtent().toString(), fileSize);
}
}
verifyData(c, tableName, 0, 100 * 100 - 1, false);
}

final long sleepMillis = TestStatsDRegistryFactory.pollingFrequency.toMillis();

Wait.waitFor(() -> {
while (!queueMetrics.isEmpty()) {
var qm = queueMetrics.take();
if (qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
&& qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
if (Integer.parseInt(qm.getValue()) > 0) {
return true;
}
}
}
return false;
}, 60_000, sleepMillis, "did not see Q1 metrics");

Wait.waitFor(() -> {
int queueSize = 0;
boolean sawQueues = false;
while (!queueMetrics.isEmpty()) {
var metric = queueMetrics.take();
if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
queueSize = Integer.parseInt(metric.getValue());
} else if (metric.getName()
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) {
sawQueues = true;
} else {
log.debug("Other metric not used in the test: {}", metric);
}
}
return queueSize == QUEUE1_SIZE && sawQueues;
}, 60_000, sleepMillis, "did not see the expected number of queued compactions");
// wait for compaction jobs to be queued
Wait.waitFor(() -> getJobsQueued() > 0, 60_000, sleepMillis,
"Expected to see compaction jobs queued");

// change compactor settings so that compactions no longer need to run
context.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "2000");
context.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "2000");

// wait for queue to clear
Wait.waitFor(() -> {
int jobsQueued = QUEUE1_SIZE;
int queueLength = 0;
long dequeued = 0;
long rejected = 0;
while (!queueMetrics.isEmpty()) {
var metric = queueMetrics.take();
if (metric.getName()
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
jobsQueued = Integer.parseInt(metric.getValue());
} else if (metric.getName()
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
queueLength = Integer.parseInt(metric.getValue());
} else if (metric.getName()
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED)
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
rejected = Long.parseLong(metric.getValue());
} else if (metric.getName()
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED)
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
dequeued = Long.parseLong(metric.getValue());
}
Wait.waitFor(() -> getJobsQueued() == 0, 60_000, sleepMillis,
"Expected job queue to be cleared once compactions no longer need to happen");
}

/**
* @return the number of jobs queued in the compaction queue. Returns -1 if no metrics are found.
*/
private int getJobsQueued() throws InterruptedException {
Integer jobsQueued = null;
while (!queueMetrics.isEmpty()) {
var metric = queueMetrics.take();
if (metric.getName()
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
jobsQueued = Integer.parseInt(metric.getValue());
}
log.info("Queue size: {} Jobs Queued: {} Jobs Dequeued: {} Jobs Rejected: {}", queueLength,
jobsQueued, dequeued, rejected);
return jobsQueued == 0;
}, 60_000, sleepMillis,
"expected job queue to be cleared once compactions no longer need to happen");
}
if (jobsQueued == null) {
log.warn("No compaction job queue metrics found.");
return -1;
}
log.info("Jobs Queued: {}", jobsQueued);
return jobsQueued;
}

}

0 comments on commit 9a63f3e

Please sign in to comment.