Skip to content

Commit

Permalink
Create TestDataAccessFactory to monitor temp table creation in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
burmanm committed Jul 4, 2017
1 parent 56e8cd7 commit c0f75b2
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public class DataAccessImpl implements DataAccess {
public static final String TEMP_TABLE_NAME_FORMAT = TEMP_TABLE_NAME_PROTOTYPE + "%d";
public static final String TEMP_TABLE_NAME_FORMAT_STRING = TEMP_TABLE_NAME_PROTOTYPE + "%s";
public static final int NUMBER_OF_TEMP_TABLES = 12;
public static final int POS_OF_OLD_DATA = NUMBER_OF_TEMP_TABLES;

public static final long DPART = 0;
private Session session;
Expand Down Expand Up @@ -179,7 +178,7 @@ public StatementType getType() {

// Create statement prototype

private static String TEMP_TABLE_BASE_CREATE = "CREATE TABLE %s ( " +
private static String TEMP_TABLE_BASE_CREATE = "CREATE TABLE IF NOT EXISTS %s ( " +
"tenant_id text, " +
"type tinyint, " +
"metric text, " +
Expand Down Expand Up @@ -372,7 +371,14 @@ private Integer getMapKey(MetricType type, TempStatement ts) {
return getMapKey(type.getCode(), ts.ordinal());
}

private void prepareTempStatements(Map<Integer, PreparedStatement> statementMap, String tableName) {
void prepareTempStatements(String tableName) {
// Proceed to create the preparedStatements against this table
Long mapKey = tableToMapKey(tableName);

// Create an entry to the correct Long
Map<Integer, PreparedStatement> statementMap = new HashMap<>();
prepMap.put(mapKey, statementMap);

// Per metricType
for (MetricType<?> metricType : MetricType.all()) {
if(metricType == STRING) { continue; } // We don't support String metrics in temp tables yet
Expand Down Expand Up @@ -434,7 +440,7 @@ public Completable createTempTablesIfNotExists(Set<Long> timestamps) {
tables.removeAll(existingTables);

return Completable.fromObservable(Observable.from(tables)
.concatMap(t -> createTemporaryTable(t)));
.concatMap(this::createTemporaryTable));
}

Observable<ResultSet> createTemporaryTable(String tempTableName) {
Expand All @@ -448,27 +454,21 @@ private void initializeTemporaryTableStatements() {
prepMap = new ConcurrentSkipListMap<>();
session.getCluster().register(new TemporaryTableStatementCreator());

Long mapKey = null;
int preparedTempTables = 0;

// At startup we should initialize preparedStatements for all the temporary tables that are existing
// At startup we should initialize preparedStatements for all the temporary tables that exists
for (TableMetadata table : metadata.getKeyspace(session.getLoggedKeyspace()).getTables()) {
if(table.getName().startsWith(TEMP_TABLE_NAME_PROTOTYPE)) {
// Proceed to create the preparedStatements against this table
mapKey = tableToMapKey(table.getName());

// Create an entry to the correct Long
Map<Integer, PreparedStatement> statementMap = new HashMap<>();
prepMap.put(mapKey, statementMap);

// Now prepare all the necessary statements to statementMap
prepareTempStatements(statementMap, table.getName());
prepareTempStatements(table.getName());
preparedTempTables++;
}
}

// TempTableCreator should take care of creating tables, but we'll need at least one to be able to process
// writes
if(mapKey == null) {
createTemporaryTable(getTempTableName(System.currentTimeMillis()));
if(preparedTempTables < 1) {
String tableName = getTempTableName(DateTimeService.now.get().getMillis());
createTemporaryTable(tableName);
}

// Prepare the old fashioned way (data table) as fallback when out-of-order writes happen..
Expand Down Expand Up @@ -866,7 +866,7 @@ public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric, boolean

@Override
public <T> Observable<Row> findMetricInData(MetricId<T> id) {
return getPrepForAllTempTablesWithoutType(TempStatement.CHECK_EXISTENCE_OF_METRIC_IN_TABLE)
return getPrepForAllTempTables(TempStatement.CHECK_EXISTENCE_OF_METRIC_IN_TABLE)
.map(b -> b.bind(id.getTenantId(), id.getType().getCode(), id.getName(), DPART))
.flatMap(b -> rxSession.executeAndFetch(b))
.concatWith(rxSession.executeAndFetch(findMetricInData
Expand Down Expand Up @@ -1019,30 +1019,13 @@ private Set<TokenRange> getTokenRanges() {
//// .andThen(Completable.fromObservable(rxSession.execute(reCreateCQL)));
// }

private Observable<PreparedStatement> getPrepForAllTempTablesWithoutType(TempStatement ts) {
private Observable<PreparedStatement> getPrepForAllTempTables(TempStatement ts) {
return Observable.from(prepMap.entrySet())
.map(Map.Entry::getValue)
.map(pMap -> pMap.get(getMapKey(MetricType.UNDEFINED, ts)));

}

private Observable<PreparedStatement> getPrepForAllTempTables(TempStatement ts) {
return Observable.from(prepMap.entrySet())
.map(Map.Entry::getValue)
.zipWith(Observable.just(MetricType.UNDEFINED), (pMap, metricType) -> pMap.get(getMapKey(metricType,
ts)));
// .zipWith(Observable.from(MetricType.all()).filter(t -> t != STRING).doOnNext(mt -> System.out.printf
// ("Next: %s\n", mt.toString())),
// (pMap, metricType) -> {
// PreparedStatement preparedStatement = ;
// if(preparedStatement == null) {
// System.out.printf("Could not find preparedStatement for %s ; %s\n", metricType
// .toString(), ts.name());
// }
// return preparedStatement;
// });
}

@Override
public Observable<Row> findAllMetricsInData() {
return getPrepForAllTempTables(TempStatement.LIST_ALL_METRICS_FROM_TABLE)
Expand Down Expand Up @@ -1125,10 +1108,14 @@ String getTempTableName(long timestamp) {
// }

PreparedStatement getTempStatement(MetricType type, TempStatement ts, long timestamp) {
return prepMap
.floorEntry(timestamp)
.getValue()
.get(getMapKey(type, ts));
Map.Entry<Long, Map<Integer, PreparedStatement>> floorEntry = prepMap
.floorEntry(timestamp);

if(floorEntry != null) {
return floorEntry.getValue()
.get(getMapKey(type, ts));
}
return null;
}

@Override
Expand Down Expand Up @@ -1357,11 +1344,9 @@ public <T> Observable<Row> findTempData(MetricId<T> id, long startTime, long end
// Depending on the order, these must be read in the correct order also..

if(startKey == null) {
// The start time is already compressed, start the request from earliest non-compressed
startKey = prepMap.ceilingKey(startTime);
}
if(endKey == null) {
endKey = prepMap.ceilingKey(endTime);
}

if (order == Order.ASC) {
SortedMap<Long, Map<Integer, PreparedStatement>> statementMap = prepMap.subMap(startKey, true, endKey,
Expand Down Expand Up @@ -1596,23 +1581,20 @@ private class TemporaryTableStatementCreator implements SchemaChangeListener {

private final CoreLogger log = CoreLogging.getCoreLogger(TemporaryTableStatementCreator.class);

@Override public void onTableAdded(TableMetadata tableMetadata) {
log.infof("Registering prepared statements for table %s", tableMetadata.getName());

Map<Integer, PreparedStatement> statementMap = new HashMap<>();
prepareTempStatements(statementMap, tableMetadata.getName());

// Find the integer key and insert to the prepMap
Long mapKey = tableToMapKey(tableMetadata.getName());
prepMap.put(mapKey, statementMap);
@Override
public void onTableAdded(TableMetadata tableMetadata) {
if(tableMetadata.getName().startsWith(TEMP_TABLE_NAME_FORMAT)) {
log.infof("Registering prepared statements for table %s", tableMetadata.getName());
prepareTempStatements(tableMetadata.getName());
}
}

@Override public void onTableRemoved(TableMetadata tableMetadata) {
log.infof("Removing prepared statements for table %s", tableMetadata.getName());

// Find the integer key and remove from prepMap
Long mapKey = tableToMapKey(tableMetadata.getName());
prepMap.remove(mapKey);
@Override
public void onTableRemoved(TableMetadata tableMetadata) {
if(tableMetadata.getName().startsWith(TEMP_TABLE_NAME_FORMAT)) {
log.infof("Removing prepared statements for table %s", tableMetadata.getName());
removeTempStatements(tableMetadata.getName());
}
}

// Rest are not interesting to us
Expand Down Expand Up @@ -1642,4 +1624,10 @@ private class TemporaryTableStatementCreator implements SchemaChangeListener {
@Override public void onRegister(Cluster cluster) {}
@Override public void onUnregister(Cluster cluster) {}
}

void removeTempStatements(String tableName) {
// Find the integer key and remove from prepMap
Long mapKey = tableToMapKey(tableName);
prepMap.remove(mapKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,6 @@ public Observable<List<NumericBucketPoint>> findCounterStats(MetricId<Long> id,
percentiles) {
TimeRange timeRange = bucketConfig.getTimeRange();
checkArgument(isValidTimeRange(timeRange.getStart(), timeRange.getEnd()), "Invalid time range");
System.out.printf("----------_> CAN'T GET IT");
return findDataPoints(id, timeRange.getStart(), timeRange.getEnd(), 0, ASC)
.doOnError(Throwable::printStackTrace)
.compose(new NumericBucketPointTransformer(bucketConfig.getBuckets(), percentiles));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,12 @@ protected <T> void doAction(Supplier<Observable<T>> fn) {
Observable<T> observable = fn.get().doOnError(Throwable::printStackTrace);
observable.subscribe(subscriber);
subscriber.awaitTerminalEvent(5, TimeUnit.SECONDS);
for (Throwable throwable : subscriber.getOnErrorEvents()) {
throwable.printStackTrace();
}
subscriber.assertNoErrors();
subscriber.assertCompleted();

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,8 @@ public class DataAccessITest extends BaseITest {

@BeforeClass
public void initClass() {
dataAccess = new DataAccessImpl(session);
try {
// Wait for the SchemaChangeListener to do its work
Thread.sleep(2000);
} catch (InterruptedException e) {
// e.printStackTrace();
}
this.dataAccess = (DataAccessImpl) TestDataAccessFactory.newInstance(session);

truncateTenants = session.prepare("TRUNCATE tenants");
truncateGaugeData = session.prepare("TRUNCATE data");
truncateCompressedData = session.prepare("TRUNCATE data_compressed");
Expand Down Expand Up @@ -119,8 +114,8 @@ public void doNotAllowDuplicateTenants() throws Exception {

@Test
public void insertAndFindGaugeRawData() throws Exception {
DateTime start = now().minusMinutes(10);
DateTime end = start.plusMinutes(6);
DateTime start = now();
DateTime end = start.plusMinutes(16);

Metric<Double> metric = new Metric<>(new MetricId<>("tenant-1", GAUGE, "metric-1"), asList(
new DataPoint<>(start.getMillis(), 1.23),
Expand Down Expand Up @@ -149,8 +144,8 @@ public void insertAndFindGaugeRawData() throws Exception {

@Test
public void addMetadataToGaugeRawData() throws Exception {
DateTime start = now().minusMinutes(10);
DateTime end = start.plusMinutes(6);
DateTime start = now();
DateTime end = start.plusMinutes(16);
String tenantId = "tenant-1";

Metric<Double> metric = new Metric<>(new MetricId<>(tenantId, GAUGE, "metric-1"), asList(
Expand All @@ -160,7 +155,7 @@ public void addMetadataToGaugeRawData() throws Exception {
new DataPoint<>(end.getMillis(), 1.234)
));

dataAccess.insertData(Observable.just(metric)).toBlocking().last();
doAction(() -> dataAccess.insertData(Observable.just(metric)).doOnError(Throwable::printStackTrace));

Observable<Row> observable = dataAccess.findTempData(new MetricId<>("tenant-1", GAUGE, "metric-1"),
start.getMillis(), end.getMillis(), 0, Order.DESC, DEFAULT_PAGE_SIZE);
Expand All @@ -180,8 +175,8 @@ public void addMetadataToGaugeRawData() throws Exception {

@Test
public void insertAndFindAvailabilities() throws Exception {
DateTime start = now().minusMinutes(10);
DateTime end = start.plusMinutes(6);
DateTime start = now();
DateTime end = start.plusMinutes(16);
String tenantId = "avail-test";
Metric<AvailabilityType> metric = new Metric<>(new MetricId<>(tenantId, AVAILABILITY, "m1"),
singletonList(new DataPoint<>(start.getMillis(), UP)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.service;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

import org.hawkular.metrics.datetime.DateTimeService;
import org.joda.time.DateTime;

import com.datastax.driver.core.Session;

/**
* @author Michael Burman
*/
public class TestDataAccessFactory {

public static DataAccess newInstance(Session session) {
final CountDownLatch latch = new CountDownLatch(3);
DataAccessImpl dataAccess = new DataAccessImpl(session) {
@Override
void prepareTempStatements(String tableName) {
super.prepareTempStatements(tableName);
if (latch.getCount() > 0) {
latch.countDown();
}
}
};
dataAccess.createTempTablesIfNotExists(tableListForTesting())
.subscribe();
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

return dataAccess;
}

/**
* Create few temporary tables for tests
*/
static Set<Long> tableListForTesting() {
Set<Long> tempTables = new HashSet<>(2);
DateTime now = DateTimeService.now.get();
tempTables.add(now.minusHours(2).getMillis());
tempTables.add(now.getMillis());
tempTables.add(now.plusHours(2).getMillis());
return tempTables;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void addAvailabilityForMultipleMetrics() throws Exception {
new DataPoint<>(start.plusSeconds(30).getMillis(), UP)));
Metric<AvailabilityType> m3 = new Metric<>(new MetricId<>(tenantId, AVAILABILITY, "m3"));

metricsService.addDataPoints(AVAILABILITY, Observable.just(m1, m2, m3)).toBlocking().lastOrDefault(null);
doAction(() -> metricsService.addDataPoints(AVAILABILITY, Observable.just(m1, m2, m3)));

List<DataPoint<AvailabilityType>> actual = metricsService.findDataPoints(m1.getMetricId(),
start.getMillis(), end.getMillis(), 0, Order.ASC).toList().toBlocking().last();
Expand All @@ -100,9 +100,9 @@ public void addAvailabilityForMultipleMetrics() throws Exception {
asList(
new DataPoint<>(start.plusMinutes(2).getMillis(), UP),
new DataPoint<>(end.plusMinutes(2).getMillis(), UP)));
metricsService.createMetric(m4, false).toBlocking().lastOrDefault(null);
doAction(() -> metricsService.createMetric(m4, false));

metricsService.addDataPoints(AVAILABILITY, Observable.just(m4)).toBlocking().lastOrDefault(null);
doAction(() -> metricsService.addDataPoints(AVAILABILITY, Observable.just(m4)));

actual = metricsService.findDataPoints(m4.getMetricId(), start.getMillis(), end.getMillis(), 0, Order.DESC)
.toList().toBlocking().last();
Expand All @@ -118,12 +118,12 @@ public void addAvailabilityForMultipleMetrics() throws Exception {

@Test
public void findDistinctAvailabilities() throws Exception {
DateTime end = now();
DateTime start = end.minusMinutes(20);
DateTime start = now();
DateTime end = start.plusMinutes(20);
String tenantId = "tenant1";
MetricId<AvailabilityType> metricId = new MetricId<>("tenant1", AVAILABILITY, "A1");

metricsService.createTenant(new Tenant(tenantId), false).toBlocking().lastOrDefault(null);
doAction(() -> metricsService.createTenant(new Tenant(tenantId), false));

Metric<AvailabilityType> metric = new Metric<>(metricId, asList(
new DataPoint<>(start.getMillis(), UP),
Expand All @@ -139,10 +139,12 @@ public void findDistinctAvailabilities() throws Exception {
new DataPoint<>(start.plusMinutes(10).getMillis(), ADMIN),
new DataPoint<>(start.plusMinutes(11).getMillis(), UP)));

metricsService.addDataPoints(AVAILABILITY, Observable.just(metric)).toBlocking().lastOrDefault(null);
doAction(() -> metricsService.addDataPoints(AVAILABILITY, Observable.just(metric)));

List<DataPoint<AvailabilityType>> actual = metricsService.findAvailabilityData(metricId,
start.getMillis(), end.getMillis(), true, 0, Order.ASC).toList().toBlocking().lastOrDefault(null);
start.getMillis(), end.getMillis(), true, 0, Order.ASC)
.doOnError(Throwable::printStackTrace)
.toList().toBlocking().lastOrDefault(null);

List<DataPoint<AvailabilityType>> expected = asList(
metric.getDataPoints().get(0),
Expand Down

0 comments on commit c0f75b2

Please sign in to comment.