Skip to content

Commit

Permalink
data_temp_0 as fallback instead of data
Browse files Browse the repository at this point in the history
  • Loading branch information
burmanm committed Jul 4, 2017
1 parent 6c0d40a commit a05dd42
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ private void startMetricsService() {

new CassandraDriverMetrics(session, metricRegistry).registerAll();

initJobsService();

if (Boolean.valueOf(metricsReportingEnabled)) {
DropWizardReporter reporter = new DropWizardReporter(metricRegistry, metricNameService, metricsService);
int interval = Integer.getInteger(collectionIntervalConfig, 180);
Expand All @@ -460,8 +462,6 @@ private void startMetricsService() {

metricsServiceReady.fire(new ServiceReadyEvent(metricsService.insertedDataEvents()));

initJobsService();

initGCGraceSecondsManager();

if (Boolean.parseBoolean(jmxReportingEnabled)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class DataAccessImpl implements DataAccess {

private static final CoreLogger log = CoreLogging.getCoreLogger(DataAccessImpl.class);

public static final String OUT_OF_ORDER_TABLE_NAME = "data_temp_0";
public static final String TEMP_TABLE_NAME_PROTOTYPE = "data_temp_";
public static final String TEMP_TABLE_NAME_FORMAT_STRING = TEMP_TABLE_NAME_PROTOTYPE + "%s";

Expand Down Expand Up @@ -342,9 +343,9 @@ public StatementType getType() {

private static DateTimeFormatter TEMP_TABLE_DATEFORMATTER = (new DateTimeFormatterBuilder())
.appendValue(ChronoField.YEAR, 4)
.appendValue(ChronoField.MONTH_OF_YEAR, 2)
.appendValue(ChronoField.DAY_OF_MONTH, 2)
.appendValue(ChronoField.HOUR_OF_DAY, 2).toFormatter();
.appendValue(ChronoField.MONTH_OF_YEAR, 2)
.appendValue(ChronoField.DAY_OF_MONTH, 2)
.appendValue(ChronoField.HOUR_OF_DAY, 2).toFormatter();

private CodecRegistry codecRegistry;
private Metadata metadata;
Expand Down Expand Up @@ -380,13 +381,9 @@ private Integer getMapKey(MetricType type, TempStatement ts) {
return getMapKey(type.getCode(), ts.ordinal());
}

void prepareTempStatements(String tableName) {
// Proceed to create the preparedStatements against this table
Long mapKey = tableToMapKey(tableName);

void prepareTempStatements(String tableName, Long mapKey) {
// Create an entry to the correct Long
Map<Integer, PreparedStatement> statementMap = new HashMap<>();
prepMap.put(mapKey, statementMap);

// Per metricType
for (MetricType<?> metricType : MetricType.userTypes()) {
Expand Down Expand Up @@ -429,6 +426,7 @@ void prepareTempStatements(String tableName) {
PreparedStatement prepared = session.prepare(formatSt);
statementMap.put(key, prepared);
}
prepMap.put(mapKey, statementMap);
}

@Override
Expand Down Expand Up @@ -461,83 +459,49 @@ Observable<ResultSet> createTemporaryTable(String tempTableName) {
void checkTempOperationalStatus(int preparedTempTables) {
if(preparedTempTables < 1) {
String tableName = getTempTableName(DateTimeService.now.get().getMillis());
createTemporaryTable(tableName);
createTemporaryTable(tableName).toBlocking().subscribe();
}
}

private void initializeTemporaryTableStatements() {
prepMap = new ConcurrentSkipListMap<>();
setTempTableCreator(new TemporaryTableStatementCreator());
// tableCreator = new TemporaryTableStatementCreator();

int preparedTempTables = 0;
boolean zeroTableExists = false;

// At startup we should initialize preparedStatements for all the temporary tables that exists
for (TableMetadata table : metadata.getKeyspace(session.getLoggedKeyspace()).getTables()) {
if(table.getName().startsWith(TEMP_TABLE_NAME_PROTOTYPE)) {
prepareTempStatements(table.getName());
preparedTempTables++;
// Proceed to create the preparedStatements against this table
Long mapKey = tableToMapKey(table.getName());
prepareTempStatements(table.getName(), mapKey);
// preparedTempTables++;
} else if(table.getName().equals(OUT_OF_ORDER_TABLE_NAME)) {
zeroTableExists = true;
}
}

if(!zeroTableExists) {
createTemporaryTable(OUT_OF_ORDER_TABLE_NAME).toBlocking().subscribe();
}

// TempTableCreatorListener should take care of creating tables, but we'll need at least one to be able to process
// writes
checkTempOperationalStatus(preparedTempTables);
// checkTempOperationalStatus(preparedTempTables);

// Prepare the old fashioned way (data table) as fallback when out-of-order writes happen..
// These should be transparent in writes
if(false) { // TODO Replace with configurable setting if this is enabled or not
Map<Integer, PreparedStatement> statementMap = new HashMap<>();
for (MetricType<?> metricType : MetricType.all()) {
// Reads
PreparedStatement prepared = session.prepare(
String.format(TempStatement.dateRangeExclusive.getStatement(), metricTypeToColumnName(metricType), "data")
);
Integer key = getMapKey(metricType, TempStatement.dateRangeExclusive);
statementMap.put(key, prepared);

prepared = session.prepare(
String.format(TempStatement.dateRangeExclusiveWithLimit.getStatement(), metricTypeToColumnName(metricType)
, "data")
);
key = getMapKey(metricType, TempStatement.dateRangeExclusiveWithLimit);
statementMap.put(key, prepared);

prepared = session.prepare(
String.format(TempStatement.dataByDateRangeExclusiveASC.getStatement(), metricTypeToColumnName(metricType)
, "data")
);
key = getMapKey(metricType, TempStatement.dataByDateRangeExclusiveASC);
statementMap.put(key, prepared);

prepared = session.prepare(
String.format(TempStatement.dataByDateRangeExclusiveWithLimitASC.getStatement(), metricTypeToColumnName
(metricType), "data")
);

key = getMapKey(metricType, TempStatement.dataByDateRangeExclusiveWithLimitASC);
statementMap.put(key, prepared);
prepareTempStatements(OUT_OF_ORDER_TABLE_NAME, 0L); // Fall back is always at value 0 (floorKey/floorEntry will hit it)
}

// Writes
prepared = session.prepare(
String.format(TempStatement.INSERT_DATA.getStatement(), "data",
metricTypeToColumnName(metricType))
);
// EEK: CHECK_EXISTENCE_OF_METRIC_IN_TABLE

key = getMapKey(metricType, TempStatement.INSERT_DATA);
statementMap.put(key, prepared);

prepared = session.prepare(
String.format(TempStatement.INSERT_DATA_WITH_TAGS.getStatement(), "data",
metricTypeToColumnName(metricType))
);

key = getMapKey(metricType, TempStatement.INSERT_DATA_WITH_TAGS);
statementMap.put(key, prepared);
}
prepMap.put(0L, statementMap); // Fall back is always at value 0 (floorKey/floorEntry will hit it)
}
}
// private String modifyToUuidFormat(String st) {
// st = st.replace("time = ?", "time = minTimeuuid(?)");
// st = st.replace("time >= ?", "time >= minTimeuuid(?)");
// st = st.replace("time < ?", "time < maxTimeuuid(?)");
// return st;
// }

private String metricTypeToColumnName(MetricType<?> type) {
switch(type.getCode()) {
Expand Down Expand Up @@ -904,6 +868,15 @@ public <T> Observable<Row> findMetricsInMetricsIndex(String tenantId, MetricType
*/
@Override
public Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize, int maxConcurrency) {
PreparedStatement ts =
getTempStatement(MetricType.UNDEFINED, TempStatement.SCAN_WITH_TOKEN_RANGES, timestamp);

// The table does not exists - case such as when starting Hawkular-Metrics for the first time just before
// compression kicks in
if(ts == null) {
return Observable.empty();
}

return Observable.from(getTokenRanges())
.map(tr -> rxSession.executeAndFetch(
getTempStatement(MetricType.UNDEFINED, TempStatement.SCAN_WITH_TOKEN_RANGES, timestamp)
Expand Down Expand Up @@ -1024,6 +997,7 @@ PreparedStatement getTempStatement(MetricType type, TempStatement ts, long times
return floorEntry.getValue()
.get(getMapKey(type, ts));
}
// We should never be here..
return null;
}

Expand All @@ -1040,27 +1014,21 @@ private <T> Observable.Transformer<DataPoint<T>, BoundStatement> mapTempInsertSt
MetricType<T> type = metric.getMetricId().getType();
MetricId<T> metricId = metric.getMetricId();

/*
TODO If the NavigableMap returns the deprecated "data" table, then we'll need to modify this insert to bind
timeUuid instead of Date
*/

return tO -> tO
.map(dataPoint -> {
BoundStatement bs;
int i = 1;
PreparedStatement st;
if (dataPoint.getTags().isEmpty()) {
PreparedStatement st =
getTempStatement(type, TempStatement.INSERT_DATA, dataPoint.getTimestamp());
st = getTempStatement(type, TempStatement.INSERT_DATA, dataPoint.getTimestamp());

if(st == null) {
return null;
}

bs = st.bind();
} else {
PreparedStatement st =
getTempStatement(type, TempStatement.INSERT_DATA_WITH_TAGS, dataPoint.getTimestamp());
st = getTempStatement(type, TempStatement.INSERT_DATA_WITH_TAGS, dataPoint.getTimestamp());

if(st == null) {
return null;
Expand Down Expand Up @@ -1161,6 +1129,7 @@ public Observable<Integer> insertStringData(Metric<String> metric, int ttl, int
.flatMap(batch -> rxSession.execute(batch).map(resultSet -> batch.size()));
}

// TODO These are only used by the String methods
private BoundStatement bindDataPoint(PreparedStatement statement, Metric<?> metric, Object value, long timestamp) {
MetricId<?> metricId = metric.getMetricId();
return statement.bind(value, metricId.getTenantId(), metricId.getType().getCode(), metricId.getName(),
Expand Down Expand Up @@ -1216,10 +1185,8 @@ private SortedMap<Long, Map<Integer, PreparedStatement>> subSetMap(long startTim
Long startKey = prepMap.floorKey(startTime);
Long endKey = prepMap.floorKey(endTime);

// Depending on the order, these must be read in the correct order also..

// The start time is already compressed, start the request from earliest non-compressed
if(startKey == null) {
// The start time is already compressed, start the request from earliest non-compressed
startKey = prepMap.ceilingKey(startTime);
}

Expand All @@ -1228,13 +1195,13 @@ private SortedMap<Long, Map<Integer, PreparedStatement>> subSetMap(long startTim
endKey = startKey;
}

// Depending on the order, these must be read in the correct order also..
SortedMap<Long, Map<Integer, PreparedStatement>> statementMap;
if(order == Order.ASC) {
statementMap = prepMap.subMap(startKey, true, endKey,
true);
} else {
statementMap = new ConcurrentSkipListMap<>(
(var0, var2) -> var0 < var2?1:(var0 == var2?0:-1));
statementMap = new ConcurrentSkipListMap<>((var0, var2) -> var0 < var2?1:(var0 == var2?0:-1));
statementMap.putAll(prepMap.subMap(startKey, true, endKey, true));
}

Expand Down Expand Up @@ -1480,11 +1447,11 @@ private class TemporaryTableStatementCreator implements SchemaChangeListener {

@Override
public void onTableAdded(TableMetadata tableMetadata) {
log.infof("Table added %s", tableMetadata.getName());
log.debugf("Table added %s", tableMetadata.getName());
if(tableMetadata.getName().startsWith(TEMP_TABLE_NAME_PROTOTYPE)) {
log.infof("Registering prepared statements for table %s", tableMetadata.getName());
Observable.fromCallable(() -> {
prepareTempStatements(tableMetadata.getName());
prepareTempStatements(tableMetadata.getName(), tableToMapKey(tableMetadata.getName()));
return null;
})
.subscribeOn(Schedulers.io())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.hawkular.metrics.core.service;

import static java.time.ZoneOffset.UTC;

import static org.hawkular.metrics.core.service.Functions.isValidTagMap;
import static org.hawkular.metrics.core.service.Functions.makeSafe;
import static org.hawkular.metrics.core.service.Order.ASC;
Expand All @@ -27,6 +29,7 @@

import static com.google.common.base.Preconditions.checkArgument;

import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
Expand Down Expand Up @@ -279,6 +282,8 @@ public void startUp(Session session, String keyspace, boolean resetDb, boolean c
setDefaultTTL(session, keyspace);
initMetrics();

verifyAndCreateTempTables();

tagQueryParser = new SimpleTagQueryParser(this.dataAccess, this);
expresssionTagQueryParser = new ExpressionTagQueryParser(this.dataAccess, this);
}
Expand Down Expand Up @@ -715,6 +720,17 @@ private <T> Observable.Transformer<T, T> applyRetryPolicy() {
.onErrorResumeNext(Observable.empty());
}

/**
* Intended to be used at the startup of the MetricsServiceImpl to ensure we have enough tables for processing
*/
public void verifyAndCreateTempTables() {
ZonedDateTime currentBlock = ZonedDateTime.ofInstant(Instant.ofEpochMilli(DateTimeService.now.get().getMillis()), UTC)
.with(DateTimeService.startOfPreviousEvenHour());

ZonedDateTime lastStartupBlock = currentBlock.plus(6, ChronoUnit.HOURS);
verifyAndCreateTempTables(currentBlock, lastStartupBlock).await();
}

@Override
public Completable verifyAndCreateTempTables(ZonedDateTime startTime, ZonedDateTime endTime) {
Set<Long> timestamps = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public static DataAccess newInstance(Session session) {
final CountDownLatch latch = new CountDownLatch(3);
DataAccessImpl dataAccess = new DataAccessImpl(session) {
@Override
void prepareTempStatements(String tableName) {
super.prepareTempStatements(tableName);
void prepareTempStatements(String tableName, Long mapKey) {
super.prepareTempStatements(tableName, mapKey);
if (latch.getCount() > 0) {
latch.countDown();
}
Expand Down

0 comments on commit a05dd42

Please sign in to comment.