Skip to content

Commit

Permalink
[HWKMETRICS-613] Add a test for running the scheduled metrics expirat…
Browse files Browse the repository at this point in the history
…ion job via the scheduler.
  • Loading branch information
Stefan Negrea committed Mar 30, 2017
1 parent 115ce03 commit 962c077
Showing 1 changed file with 51 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import static java.util.Arrays.asList;

import static org.hawkular.metrics.model.MetricType.COUNTER;
import static org.hawkular.metrics.model.MetricType.GAUGE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertNotNull;

import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -35,7 +35,6 @@
import org.hawkular.metrics.core.service.DataAccess;
import org.hawkular.metrics.core.service.DataAccessImpl;
import org.hawkular.metrics.core.service.MetricsServiceImpl;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.model.DataPoint;
import org.hawkular.metrics.model.Metric;
import org.hawkular.metrics.model.MetricId;
Expand All @@ -54,6 +53,8 @@
import com.google.common.collect.ImmutableMap;

import rx.Observable;
import rx.observable.ListenableFutureObservable;
import rx.schedulers.Schedulers;

/**
* Test the job that deletes expired metrics.
Expand All @@ -71,7 +72,7 @@ public class DeleteExpiredMetricsJobITest extends BaseITest {
private ConfigurationService configurationService;
private TestScheduler jobScheduler;
private PreparedStatement resetConfig;

private JobDetails deleteExpiredMetricsJob;
private String jobName;

@BeforeClass
Expand Down Expand Up @@ -99,18 +100,18 @@ public void initTest(Method method) {
jobName = method.getName();

jobScheduler = new TestScheduler(rxSession);
long nextStart = LocalDateTime.now(ZoneOffset.UTC)
.with(DateTimeService.startOfNextOddHour())
.toInstant(ZoneOffset.UTC).toEpochMilli() - 60000;
jobScheduler.advanceTimeTo(nextStart);
jobScheduler.truncateTables(getKeyspace());

jobsService = new JobsServiceImpl(2, 7);
jobsService.setSession(rxSession);
jobsService.setScheduler(jobScheduler);
jobsService.setMetricsService(metricsService);
jobsService.setConfigurationService(configurationService);
jobsService.start();

deleteExpiredMetricsJob = jobsService.start().stream()
.filter(details -> details.getJobName().equals(DeleteExpiredMetrics.JOB_NAME))
.findFirst().get();
assertNotNull(deleteExpiredMetricsJob);
}

@AfterMethod(alwaysRun = true)
Expand All @@ -119,7 +120,7 @@ public void tearDown() {
}

@Test
public void testCompressJob() throws Exception {
public void testOnDemandDeleteExpiredMetricsJob() throws Exception {
String tenantId = nextTenantId();
DateTime start = new DateTime(jobScheduler.now());

Expand All @@ -145,7 +146,7 @@ public void testCompressJob() throws Exception {
assertEquals(metrics.size(), 3);

long expiration = 1;
runDeleteExpiredMetricsJob(expiration);
runOnDemandDeleteExpiredMetricsJob(expiration);
metrics = getOnNextEvents(() -> metricsService.findMetrics(tenantId, GAUGE));
assertEquals(metrics.size(), 3);

Expand All @@ -154,7 +155,7 @@ public void testCompressJob() throws Exception {
//When running the purge at 22 days - 2 hours, G2 should still be present
for (int i = 18; i < 23; i++) {
expiration = System.currentTimeMillis() + (i * 24 - 2) * 3600 * 1000L;
runDeleteExpiredMetricsJob(expiration);
runOnDemandDeleteExpiredMetricsJob(expiration);
metrics = getOnNextEvents(() -> metricsService.findMetrics(tenantId, GAUGE));
assertEquals(metrics.size(), 2);
for (Metric<?> metric : metrics) {
Expand All @@ -163,18 +164,18 @@ public void testCompressJob() throws Exception {
}

expiration = System.currentTimeMillis() + 28 * 24 * 3600 * 1000L;
runDeleteExpiredMetricsJob(expiration);
runOnDemandDeleteExpiredMetricsJob(expiration);
metrics = getOnNextEvents(() -> metricsService.findMetrics(tenantId, GAUGE));
assertEquals(metrics.size(), 1);
assertEquals(metrics.get(0).getId(), "G3");

expiration = System.currentTimeMillis() + 32 * 24 * 3600 * 1000L;
runDeleteExpiredMetricsJob(expiration);
runOnDemandDeleteExpiredMetricsJob(expiration);
metrics = getOnNextEvents(() -> metricsService.findMetrics(tenantId, GAUGE));
assertEquals(metrics.size(), 0);
}

private void runDeleteExpiredMetricsJob(long time) throws InterruptedException {
private void runOnDemandDeleteExpiredMetricsJob(long time) throws InterruptedException {
JobDetails runJobDetails = jobsService.submitDeleteExpiredMetricsJob(time, jobName).toBlocking().value();
CountDownLatch latch = new CountDownLatch(1);
jobScheduler.onJobFinished(jobDetails -> {
Expand All @@ -186,6 +187,41 @@ private void runDeleteExpiredMetricsJob(long time) throws InterruptedException {
assertTrue(latch.await(10, TimeUnit.SECONDS));
}

@Test
public void testScheduleDeleteExpiredMetricsJob() throws Exception {
String tenantId = nextTenantId();

Metric<Long> c1 = new Metric<>(new MetricId<>(tenantId, COUNTER, "C1"),
ImmutableMap.of("x", "1", "y", "2"), 1);
Metric<Long> c2 = new Metric<>(new MetricId<>(tenantId, COUNTER, "C2"),
ImmutableMap.of("x", "2", "y", "3"), 12);

doAction(() -> metricsService.createMetric(c1, true));
ListenableFutureObservable
.from(dataAccess.updateMetricExpirationIndex(c1.getMetricId(),
System.currentTimeMillis() - 3 * 24 * 3600 * 1000L), Schedulers.immediate());
doAction(() -> metricsService.createMetric(c2, true));

List<Metric<Long>> metrics = getOnNextEvents(() -> metricsService.findMetrics(tenantId, COUNTER));
assertEquals(metrics.size(), 2);

waitForScheduledDeleteExpiredMetricsJob();
metrics = getOnNextEvents(() -> metricsService.findMetrics(tenantId, COUNTER));
assertEquals(metrics.size(), 1);
assertEquals(metrics.get(0).getId(), "C2");
}

private void waitForScheduledDeleteExpiredMetricsJob() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
jobScheduler.onJobFinished(jobDetails -> {
latch.countDown();
});

jobScheduler.advanceTimeTo(deleteExpiredMetricsJob.getTrigger().getTriggerTime());
jobScheduler.advanceTimeBy(2);
assertTrue(latch.await(25, TimeUnit.SECONDS));
}

private String nextTenantId() {
return "T" + tenantCounter.getAndIncrement();
}
Expand Down

0 comments on commit 962c077

Please sign in to comment.