Skip to content

Commit

Permalink
Take the input time from the job, fixes tests
Browse files Browse the repository at this point in the history
Change the DataAccessITest to use a single version of time
  • Loading branch information
Michael Burman committed Mar 29, 2018
1 parent fe3001d commit 871e94c
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public interface DataAccess {
*/
// Completable resetTempTable(long timestamp);

Set<Long> findExpiredTables();
Set<Long> findExpiredTables(long startTime);

Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize, int maxConcurrency);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,10 +807,9 @@ public <T> Observable<Row> findMetricsInMetricsIndex(String tenantId, MetricType
}

@Override
public Set<Long> findExpiredTables() {
long currentTime = System.currentTimeMillis();
Long currentTableKey = prepMap.floorKey(currentTime);
return prepMap.subMap(0L, false, currentTableKey, false).keySet();
public Set<Long> findExpiredTables(long startTime) {
Long currentTableKey = prepMap.floorKey(startTime);
return prepMap.subMap(0L, false, currentTableKey, true).keySet();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ public Completable verifyAndCreateTempTables(ZonedDateTime startTime, ZonedDateT
@SuppressWarnings("unchecked")
public Completable compressBlock(long jobStartTimeSlice, int pageSize, int maxConcurrency) {
return Completable.fromObservable(
Observable.from(dataAccess.findExpiredTables())
Observable.from(dataAccess.findExpiredTables(jobStartTimeSlice))
.flatMap(startTimeSlice ->
dataAccess.findAllDataFromBucket(startTimeSlice, pageSize, maxConcurrency)
.switchIfEmpty(Observable.empty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.hawkular.metrics.core.service.transformers.MetricIdentifierFromFullDataRowTransformer;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.model.AvailabilityType;
import org.hawkular.metrics.model.DataPoint;
import org.hawkular.metrics.model.Metric;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class DataAccessITest extends BaseITest {
private static int DEFAULT_PAGE_SIZE = 5000;

private DataAccessImpl dataAccess;
private static final DateTime now = DateTimeService.now.get();

private PreparedStatement truncateTenants;
private PreparedStatement truncateGaugeData;
Expand All @@ -71,7 +73,7 @@ public class DataAccessITest extends BaseITest {

@BeforeClass
public void initClass() {
this.dataAccess = (DataAccessImpl) TestDataAccessFactory.newInstance(session);
this.dataAccess = (DataAccessImpl) TestDataAccessFactory.newInstance(session, now());

truncateTenants = session.prepare("TRUNCATE tenants");
truncateGaugeData = session.prepare("TRUNCATE data");
Expand Down Expand Up @@ -208,7 +210,7 @@ public void insertAndFindAvailabilities() throws Exception {

@Test
public void findAllMetricsPartitionKeys() throws Exception {
long start = now().getMillis();
long start = now.getMillis();

Observable.from(asList(
new Metric<>(new MetricId<>("t1", GAUGE, "m1"), singletonList(new DataPoint<>(start, 0.1))),
Expand All @@ -229,6 +231,10 @@ public void findAllMetricsPartitionKeys() throws Exception {
assertEquals(metrics.size(), 4);
}

private static DateTime now() {
return new DateTime(now);
}

@Test
void testFindAllDataFromBucket() throws Exception {
String tenantId = "t1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ public <T> Observable<Row> findMetricsInMetricsIndex(String tenantId, MetricType
return delegate.findMetricsInMetricsIndex(tenantId, type);
}

@Override public Set<Long> findExpiredTables() {
return delegate.findExpiredTables();
@Override public Set<Long> findExpiredTables(long startTime) {
return delegate.findExpiredTables(startTime);
}

@Override public Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize, int maxConcurrency) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class TestDataAccessFactory {
private static final CoreLogger log = CoreLogging.getCoreLogger(TestDataAccessFactory.class);

public static DataAccess newInstance(Session session) {
return newInstance(session, DateTimeService.now.get());
}

public static DataAccess newInstance(Session session, DateTime now) {
session.execute(String.format("USE %s", BaseITest.getKeyspace()));
final CountDownLatch latch = new CountDownLatch(3);
final CountDownLatch fallBackTable = new CountDownLatch(0);
Expand All @@ -55,7 +59,7 @@ void prepareTempStatements(String tableName, Long mapKey) {
}
}
};
dataAccess.createTempTablesIfNotExists(tableListForTesting())
dataAccess.createTempTablesIfNotExists(tableListForTesting(now))
.subscribeOn(Schedulers.io())
.toBlocking().subscribe();
try {
Expand All @@ -70,9 +74,8 @@ void prepareTempStatements(String tableName, Long mapKey) {
/**
* Create few temporary tables for tests
*/
static Set<Long> tableListForTesting() {
static Set<Long> tableListForTesting(DateTime now) {
Set<Long> tempTables = new HashSet<>(3);
DateTime now = DateTimeService.now.get();
tempTables.add(now.getMillis());
tempTables.add(now.minusHours(2).getMillis());
tempTables.add(now.plusHours(2).getMillis());
Expand Down

0 comments on commit 871e94c

Please sign in to comment.