Skip to content

Commit

Permalink
Merge pull request #982 from jsanda/hwkmetrics-772
Browse files Browse the repository at this point in the history
[HWKMETRICS-772] release/0.29.0 branch
  • Loading branch information
John Sanda committed Jun 25, 2018
2 parents b0410e2 + fb31400 commit 843de22
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 68 deletions.
Expand Up @@ -29,10 +29,8 @@

import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -63,7 +61,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.google.common.collect.ImmutableMap;

Expand All @@ -79,31 +76,30 @@ public class CompressDataJobITest extends BaseITest {

private static Logger logger = Logger.getLogger(CompressDataJobITest.class);

private static long TIMEOUT = 25;

private static AtomicInteger tenantCounter = new AtomicInteger();
private MetricsServiceImpl metricsService;
private DataAccess dataAccess;
private JobsServiceImpl jobsService;
private ConfigurationService configurationService;
private TestScheduler jobScheduler;
private PreparedStatement resetConfig;
private PreparedStatement resetConfig2;

private boolean firstExecute = true;

private JobDetails compressionJob;

@BeforeClass
public void initClass() {
dataAccess = TestDataAccessFactory.newInstance(session);

resetConfig = session.prepare("DELETE FROM sys_config WHERE config_id = 'org.hawkular.metrics.jobs." +
JOB_NAME + "'");
private long triggerTime;

resetConfig2 = session.prepare("DELETE FROM sys_config WHERE config_id = 'org.hawkular.metrics.jobs." +
TempTableCreator.JOB_NAME + "'");

session.execute(resetConfig.bind());
session.execute(resetConfig2.bind());
/**
* A new job scheduler is initialized for each test method. This method performs some set up that can be avoided for
* each test. The TempTableCreator job needs to run in order to create the raw, temp tables. It is run in this
* method, and the compression job is not run. This is done to avoid unnecessary work and to better facilitate
* testing things in isolation.
*/
@BeforeClass
public void initClass() throws Exception {
// Since the temp table creator job does not run for every test method, we do not want to drop the temp table
// at the end of the compression job. Doing so could result in subsequent test failures.
dataAccess = TestDataAccessFactory.newInstance(session, DateTimeService.now.get(), false);

configurationService = new ConfigurationService();
configurationService.init(rxSession);
Expand All @@ -113,6 +109,17 @@ public void initClass() {
metricsService.setConfigurationService(configurationService);
metricsService.startUp(session, getKeyspace(), true, metricRegistry);

// Remove job configurations that might be left over from any other test runs. This has to be done; otherwise,
// new instances of the jobs will not get created and scheduled.
removeJobConfig(TempTableCreator.CONFIG_ID);
removeJobConfig(TempDataCompressor.CONFIG_ID);

// This is a bit of hack that will suppress the compression job from getting scheduled. If JobsManager finds
// a configuration for a job in the sys_config table, it will not schedule the job because it then assumes that
// the job is already scheduled. We do this because we only want to create the temp tables that are needed for
// the test methods.
saveJobConfigJobId(TempDataCompressor.CONFIG_ID, UUID.randomUUID().toString());

jobScheduler = new TestScheduler(rxSession);
jobScheduler.truncateTables(getKeyspace());

Expand All @@ -127,72 +134,98 @@ public void initClass() {

JobDetails tableCreator =
jobDetails.stream().filter(d -> d.getJobName().equalsIgnoreCase(TempTableCreator.JOB_NAME))
.findFirst().get();
.findFirst().orElse(null);
assertNotNull(tableCreator);

CountDownLatch latch = new CountDownLatch(1);
jobScheduler.onJobFinished(details -> {
if(details.getJobName().equals(TempTableCreator.JOB_NAME)) {
if (details.getJobName().equals(TempTableCreator.JOB_NAME)) {
latch.countDown();
}
});

jobScheduler.advanceTimeTo(tableCreator.getTrigger().getTriggerTime());
jobScheduler.advanceTimeBy(1);

try {
assertTrue(latch.await(25, TimeUnit.SECONDS)); // Wait for tables to be ready
Thread.sleep(3000); // Wait for the prepared statements to be initialized even in Travis
} catch (InterruptedException e) {
assertTrue(false);
}
assertTrue(latch.await(TIMEOUT, TimeUnit.SECONDS));

compressionJob = jobDetails
.stream()
.filter(details -> details.getJobName().equals(JOB_NAME))
.findFirst().get();
// We have to remove this configuration; otherwise, the job will not get scheduled and the initTest method
// will fail.
removeJobConfig(TempDataCompressor.CONFIG_ID);

long nextStart = LocalDateTime.ofInstant(Instant.ofEpochMilli(jobScheduler.now()), ZoneOffset.UTC)
.with(DateTimeService.startOfNextOddHour())
.toInstant(ZoneOffset.UTC).toEpochMilli();
jobScheduler.shutdown();
}

CountDownLatch latch2 = new CountDownLatch(1);
jobScheduler.onJobFinished(details -> {
if(details.getJobName().equals(JOB_NAME)) {
latch2.countDown();
}
});
private void removeJobConfig(String jobConfigId) {
boolean deleted = configurationService.delete(jobConfigId).await(TIMEOUT, TimeUnit.SECONDS);
assertTrue(deleted);
}

jobScheduler.advanceTimeTo(nextStart);
jobScheduler.advanceTimeBy(1);
assertNotNull(compressionJob);
try {
assertTrue(latch.await(25, TimeUnit.SECONDS)); // Wait for first compression to pass
} catch (InterruptedException e) {
assertTrue(false);
}
private void saveJobConfigJobId(String jobConfigId, String jobId) {
boolean saved = configurationService.save(jobConfigId, "jobId", jobId)
.toCompletable()
.await(TIMEOUT, TimeUnit.SECONDS);
assertTrue(saved);
}

@BeforeMethod
public void initTest(Method method) {
public void initTest(Method method) throws Exception {
logger.debug("Starting [" + method.getName() + "]");

if(!firstExecute) {
jobScheduler.advanceTimeBy(120);
}
jobScheduler = new TestScheduler(rxSession);
jobScheduler.truncateTables(getKeyspace());

// We use the hack here again of creating the job configuration as a means of preventing the job from getting
// scheduled. We want to avoid scheduling the TempTableCreator job so that we can better test the compression
// job in isolation.
saveJobConfigJobId(TempTableCreator.CONFIG_ID, UUID.randomUUID().toString());

List<JobDetails> jobDetails = jobsManager.installJobs();

jobsService = new JobsServiceImpl();
jobsService.setSession(rxSession);
jobsService.setScheduler(jobScheduler);
jobsService.setMetricsService(metricsService);
jobsService.setConfigurationService(configurationService);
jobsService.start();

compressionJob = jobDetails
.stream()
.filter(details -> details.getJobName().equals(JOB_NAME))
.findFirst().orElse(null);
assertNotNull(compressionJob);

triggerTime = compressionJob.getTrigger().getTriggerTime();

// We advance the scheduler's clock to triggerTime, and it is important to note that we cannot advance the clock
// any later because test methods need set up test data before the job runs. Each test method is then
// responsible for advancing the clock to trigger the job.
jobScheduler.advanceTimeTo(triggerTime);
CountDownLatch latch = new CountDownLatch(1);
jobScheduler.onTimeSliceFinished(time -> {
if (time.equals(new DateTime(triggerTime).minusMinutes(1))) {
latch.countDown();
}
});
assertTrue(latch.await(TIMEOUT, TimeUnit.SECONDS));
}

@AfterMethod(alwaysRun = true)
public void tearDown() {
// We need to once again remove the job configuration; otherwise, the job will not get rescheduled in the
// initTest method, and it will fail.
removeJobConfig(TempDataCompressor.CONFIG_ID);
jobScheduler.shutdown();
}

@AfterClass(alwaysRun = true)
public void shutdown() {
dataAccess.shutdown();
}

@Test(priority = 1)
@Test
public void testCompressJob() throws Exception {
long now = jobScheduler.now();
long now = triggerTime;

DateTime start = DateTimeService.getTimeSlice(new DateTime(now, DateTimeZone.UTC).minusHours(2),
Duration.standardHours(2)).plusMinutes(30);
Expand All @@ -218,10 +251,9 @@ public void testCompressJob() throws Exception {
}
});

jobScheduler.advanceTimeTo(compressionJob.getTrigger().getTriggerTime());
jobScheduler.advanceTimeBy(1);

assertTrue(latch.await(25, TimeUnit.SECONDS));
assertTrue(latch.await(TIMEOUT, TimeUnit.SECONDS));
long startSlice = DateTimeService.getTimeSlice(start.getMillis(), Duration.standardHours(2));
long endSlice = DateTimeService.getTimeSlice(jobScheduler.now(), Duration.standardHours(2));

Expand All @@ -241,8 +273,6 @@ public void testCompressJob() throws Exception {

assertNotNull(c_value);
assertNull(tags);

firstExecute = false;
}

private <T> void testCompressResults(MetricType<T> type, Metric<T> metric, DateTime start) throws
Expand All @@ -260,7 +290,7 @@ private <T> void testCompressResults(MetricType<T> type, Metric<T> metric, DateT

jobScheduler.advanceTimeBy(1);

assertTrue(latch.await(25, TimeUnit.SECONDS));
assertTrue(latch.await(TIMEOUT, TimeUnit.SECONDS));
long startSlice = DateTimeService.getTimeSlice(start.getMillis(), Duration.standardHours(2));
long endSlice = DateTimeService.getTimeSlice(start.plusHours(1).plusMinutes(59).getMillis(), Duration
.standardHours(2));
Expand All @@ -280,9 +310,9 @@ private <T> void testCompressResults(MetricType<T> type, Metric<T> metric, DateT
assertEquals(metric.getDataPoints(), compressedPoints);
}

@Test(dependsOnMethods={"testCompressJob"})
@Test
public void testGaugeCompress() throws Exception {
long now = jobScheduler.now();
long now = triggerTime;

DateTime start = DateTimeService.getTimeSlice(new DateTime(now, DateTimeZone.UTC).minusHours(2),
Duration.standardHours(2)).plusMinutes(30);
Expand All @@ -303,9 +333,9 @@ public void testGaugeCompress() throws Exception {
testCompressResults(GAUGE, m1, start);
}

@Test(dependsOnMethods={"testCompressJob"})
@Test
public void testCounterCompress() throws Exception {
long now = jobScheduler.now();
long now = triggerTime;

DateTime start = DateTimeService.getTimeSlice(new DateTime(now, DateTimeZone.UTC).minusHours(2),
Duration.standardHours(2)).plusMinutes(30);
Expand All @@ -325,9 +355,9 @@ public void testCounterCompress() throws Exception {
testCompressResults(COUNTER, m1, start);
}

@Test(dependsOnMethods={"testCompressJob"})
@Test
public void testAvailabilityCompress() throws Exception {
long now = jobScheduler.now(); // I need to advance the triggerTime of compression job also
long now = triggerTime;

DateTime start = DateTimeService.getTimeSlice(new DateTime(now, DateTimeZone.UTC).minusHours(2),
Duration.standardHours(2)).plusMinutes(30);
Expand All @@ -347,9 +377,9 @@ public void testAvailabilityCompress() throws Exception {
testCompressResults(AVAILABILITY, m1, start);
}

@Test(dependsOnMethods={"testCompressJob"})
@Test
public void testGaugeWithTags() throws Exception {
long now = jobScheduler.now();
long now = triggerTime;

DateTime start = DateTimeService.getTimeSlice(new DateTime(now, DateTimeZone.UTC).minusHours(2),
Duration.standardHours(2)).plusMinutes(30);
Expand All @@ -370,7 +400,7 @@ public void testGaugeWithTags() throws Exception {
testCompressResults(GAUGE, m1, start);
}

@Test(dependsOnMethods={"testCompressJob"})
@Test
public void testCompressRetentionIndex() throws Exception {
long now = jobScheduler.now();

Expand Down
Expand Up @@ -28,8 +28,10 @@
import org.hawkular.metrics.datetime.DateTimeService;
import org.joda.time.DateTime;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;

import rx.Observable;
import rx.schedulers.Schedulers;

/**
Expand All @@ -44,6 +46,10 @@ public static DataAccess newInstance(Session session) {
}

public static DataAccess newInstance(Session session, DateTime now) {
return newInstance(session, now, true);
}

public static DataAccess newInstance(Session session, DateTime now, boolean dropTempTables) {
session.execute(String.format("USE %s", BaseITest.getKeyspace()));
final CountDownLatch latch = new CountDownLatch(3);
final CountDownLatch fallBackTable = new CountDownLatch(0);
Expand All @@ -58,6 +64,13 @@ void prepareTempStatements(String tableName, Long mapKey) {
latch.countDown();
}
}

@Override public Observable<ResultSet> dropTempTable(long timestamp) {
if (dropTempTables) {
return super.dropTempTable(timestamp);
}
return Observable.empty();
}
};
dataAccess.createTempTablesIfNotExists(tableListForTesting(now))
.subscribeOn(Schedulers.io())
Expand Down

0 comments on commit 843de22

Please sign in to comment.