Skip to content

Commit

Permalink
Fix CompressJobITests and add new shutdown() method to DataAccess
Browse files Browse the repository at this point in the history
  • Loading branch information
burmanm committed Jul 4, 2017
1 parent 278c343 commit d4b8e55
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,6 @@ <T> Observable<ResultSet> deleteAndInsertCompressedGauge(MetricId<T> id, long ti
<T> Observable<ResultSet> deleteFromMetricExpirationIndex(MetricId<T> id);

<T> Observable<Row> findMetricExpiration(MetricId<T> id);

void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public class DataAccessImpl implements DataAccess {
// TODO Move all of these to a new class (Cassandra specific temp table) to allow multiple implementations (such
// as in-memory + WAL in Cassandra)

private TemporaryTableStatementCreator tableCreator;

private enum StatementType {
READ, WRITE, SCAN, CREATE, DELETE
}
Expand Down Expand Up @@ -468,7 +470,8 @@ void checkTempOperationalStatus(int preparedTempTables) {

private void initializeTemporaryTableStatements() {
prepMap = new ConcurrentSkipListMap<>();
session.getCluster().register(new TemporaryTableStatementCreator());
tableCreator = new TemporaryTableStatementCreator();
session.getCluster().register(tableCreator);

int preparedTempTables = 0;

Expand Down Expand Up @@ -1667,4 +1670,8 @@ void removeTempStatements(String tableName) {
Long mapKey = tableToMapKey(tableName);
prepMap.remove(mapKey);
}

@Override public void shutdown() {
session.getCluster().unregister(tableCreator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,7 @@ public void shutdown() {
insertedDataPointEvents.onCompleted();
metricsTasks.shutdown();
unloadDataRetentions();
dataAccess.shutdown();
}

private <T> T time(Timer timer, Callable<T> callable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import static java.util.Arrays.asList;

import static org.hawkular.metrics.core.jobs.CompressData.JOB_NAME;
import static org.hawkular.metrics.core.jobs.TempDataCompressor.JOB_NAME;
import static org.hawkular.metrics.model.MetricType.AVAILABILITY;
import static org.hawkular.metrics.model.MetricType.COUNTER;
import static org.hawkular.metrics.model.MetricType.GAUGE;
Expand All @@ -29,6 +29,7 @@

import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
Expand All @@ -38,9 +39,9 @@

import org.hawkular.metrics.core.service.BaseITest;
import org.hawkular.metrics.core.service.DataAccess;
import org.hawkular.metrics.core.service.DataAccessImpl;
import org.hawkular.metrics.core.service.MetricsServiceImpl;
import org.hawkular.metrics.core.service.Order;
import org.hawkular.metrics.core.service.TestDataAccessFactory;
import org.hawkular.metrics.core.service.transformers.DataPointDecompressTransformer;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.model.AvailabilityType;
Expand Down Expand Up @@ -68,11 +69,6 @@
import rx.Observable;
import rx.observers.TestSubscriber;

//import static org.junit.Assert.assertEquals;
//import static org.junit.Assert.assertNotNull;
//import static org.junit.Assert.assertNull;
//import static org.junit.Assert.assertTrue;

/**
* Test the compression ETL jobs
*
Expand All @@ -94,12 +90,12 @@ public class CompressDataJobITest extends BaseITest {

@BeforeClass
public void initClass() {
dataAccess = new DataAccessImpl(session);
dataAccess = TestDataAccessFactory.newInstance(session);

resetConfig = session.prepare("DELETE FROM sys_config WHERE config_id = 'org.hawkular.metrics.jobs." +
JOB_NAME + "'");

configurationService = new ConfigurationService() ;
configurationService = new ConfigurationService();
configurationService.init(rxSession);

metricsService = new MetricsServiceImpl();
Expand All @@ -115,10 +111,14 @@ public void initTest(Method method) {
session.execute(resetConfig.bind());

jobScheduler = new TestScheduler(rxSession);
long nextStart = LocalDateTime.now(ZoneOffset.UTC)

// To recreate the temporary tables
dataAccess = TestDataAccessFactory.newInstance(session);
metricsService.setDataAccess(dataAccess);

long nextStart = LocalDateTime.ofInstant(Instant.ofEpochMilli(jobScheduler.now()), ZoneOffset.UTC)
.with(DateTimeService.startOfNextOddHour())
.toInstant(ZoneOffset.UTC).toEpochMilli() - 60000;
jobScheduler.advanceTimeTo(nextStart);
.toInstant(ZoneOffset.UTC).toEpochMilli();
jobScheduler.truncateTables(getKeyspace());

jobsService = new JobsServiceImpl();
Expand All @@ -129,12 +129,14 @@ public void initTest(Method method) {
compressionJob = jobsService.start().stream().filter(details -> details.getJobName().equals(JOB_NAME))
.findFirst().get();

jobScheduler.advanceTimeTo(nextStart);
assertNotNull(compressionJob);
}

@AfterMethod(alwaysRun = true)
public void tearDown() {
jobsService.shutdown();
dataAccess.shutdown();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,8 @@ public <T> Observable<ResultSet> deleteFromMetricExpirationIndex(MetricId<T> id)
public <T> Observable<Row> findMetricExpiration(MetricId<T> id) {
return delegate.findMetricExpiration(id);
}

@Override public void shutdown() {
delegate.shutdown();
}
}

0 comments on commit d4b8e55

Please sign in to comment.