Skip to content

Commit

Permalink
Moving to another machine..
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Burman authored and burmanm committed Jul 4, 2017
1 parent 3ab5f24 commit 53561f7
Showing 1 changed file with 145 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -74,6 +76,8 @@
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SchemaChangeListener;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.Token;
import com.datastax.driver.core.TokenRange;
Expand Down Expand Up @@ -221,17 +225,17 @@ public StatementType getType() {
private static String findAllMetricsInDataBases = "SELECT DISTINCT tenant_id, type, metric, dpart " +
"FROM %s";

private PreparedStatement[][] dateRangeExclusive;
private PreparedStatement[][] dateRangeExclusiveWithLimit;
private PreparedStatement[][] dataByDateRangeExclusiveASC;
private PreparedStatement[][] dataByDateRangeExclusiveWithLimitASC;
private PreparedStatement[] scanTempTableWithTokens;

private PreparedStatement[] metricsInDatas;
private PreparedStatement[] allMetricsInDatas;

private PreparedStatement[][] dataTable;
private PreparedStatement[][] dataWithTagsTable;
// private PreparedStatement[][] dateRangeExclusive;
// private PreparedStatement[][] dateRangeExclusiveWithLimit;
// private PreparedStatement[][] dataByDateRangeExclusiveASC;
// private PreparedStatement[][] dataByDateRangeExclusiveWithLimitASC;
// private PreparedStatement[] scanTempTableWithTokens;
//
// private PreparedStatement[] metricsInDatas;
// private PreparedStatement[] allMetricsInDatas;
//
// private PreparedStatement[][] dataTable;
// private PreparedStatement[][] dataWithTagsTable;

private PreparedStatement insertTenant;

Expand Down Expand Up @@ -381,9 +385,9 @@ private void prepareTempStatements(Map<Integer, PreparedStatement> statementMap,
case SCAN:
formatSt = String.format(st.getStatement(), tableName);
break;
case CREATE:
formatSt = String.format(st.getStatement(), tableName);
break;
// case CREATE:
// formatSt = String.format(st.getStatement(), tableName);
// break;
default:
// Not supported
continue;
Expand All @@ -395,15 +399,26 @@ private void prepareTempStatements(Map<Integer, PreparedStatement> statementMap,
}
}

Observable<ResultSet> createTemporaryTable(long timestamp) {
String tempTableName = getTempTableName(timestamp);
SimpleStatement st =
new SimpleStatement(String.format(TempStatement.CREATE_TABLE.getStatement(), tempTableName));

return rxSession.execute(st);
// The statement preparation is handled by the schemaListener
}

private void initializeTemporaryTableStatements() {
prepMap = new ConcurrentSkipListMap<>();
session.getCluster().register(new TemporaryTableStatementCreator());

Long mapKey = null;

// At startup we should initialize preparedStatements for all the temporary tables that are existing
for (TableMetadata table : metadata.getKeyspace(session.getLoggedKeyspace()).getTables()) {
if(table.getName().startsWith(TEMP_TABLE_PROTOTYPE)) {
// Proceed to create the preparedStatements against this table
Long mapKey = tableToMapKey(table.getName());
mapKey = tableToMapKey(table.getName());

// Create an entry to the correct Long
Map<Integer, PreparedStatement> statementMap = new HashMap<>();
Expand All @@ -414,78 +429,84 @@ private void initializeTemporaryTableStatements() {
}
}

// TODO Create more tables if not sufficient amount exists, define sufficient..
if(mapKey == null) {
// This should be a time range
createTemporaryTable(System.currentTimeMillis());
}

// These are for the ring buffer strategy (slightly faster but Cassandra 3.x has issues with consistent schema)

// Read statements
dateRangeExclusive = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES+1];
dateRangeExclusiveWithLimit = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES+1];
dataByDateRangeExclusiveASC = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES+1];
dataByDateRangeExclusiveWithLimitASC = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES+1];
scanTempTableWithTokens = new PreparedStatement[NUMBER_OF_TEMP_TABLES];

// Insert statements
dataTable = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES];
dataWithTagsTable = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES];

// MetricDefinition statements
metricsInDatas = new PreparedStatement[NUMBER_OF_TEMP_TABLES+1];
allMetricsInDatas = new PreparedStatement[NUMBER_OF_TEMP_TABLES+1];

// Initialize all the temporary tables for inserts
for (MetricType<?> metricType : MetricType.all()) {
for(int k = 0; k < NUMBER_OF_TEMP_TABLES; k++) {
// String metrics are not yet supported with temp tables
if(metricType.getCode() == 4) {
continue;
}

String tempTableName = String.format(TEMP_TABLE_NAME_FORMAT, k);

// Insert statements
dataTable[metricType.getCode()][k] = session.prepare(
String.format(data, tempTableName, metricTypeToColumnName(metricType))
);
dataWithTagsTable[metricType.getCode()][k] = session.prepare(
String.format(dataWithTags, tempTableName, metricTypeToColumnName(metricType))
);
// Read statements
dateRangeExclusive[metricType.getCode()][k] = session.prepare(
String.format(byDateRangeExclusiveBase, metricTypeToColumnName(metricType), tempTableName)
);
dateRangeExclusiveWithLimit[metricType.getCode()][k] = session.prepare(
String.format(dateRangeExclusiveWithLimitBase, metricTypeToColumnName(metricType), tempTableName)
);
dataByDateRangeExclusiveASC[metricType.getCode()][k] = session.prepare(
String.format(dataByDateRangeExclusiveASCBase, metricTypeToColumnName(metricType), tempTableName)
);
dataByDateRangeExclusiveWithLimitASC[metricType.getCode()][k] = session.prepare(
String.format(dataByDateRangeExclusiveWithLimitASCBase, metricTypeToColumnName(metricType), tempTableName)
);
}
// Then initialize the old fashion ones..
dateRangeExclusive[metricType.getCode()][POS_OF_OLD_DATA] = session.prepare(
String.format(byDateRangeExclusiveBase, metricTypeToColumnName(metricType), "data")
);
dateRangeExclusiveWithLimit[metricType.getCode()][POS_OF_OLD_DATA] = session.prepare(
String.format(dateRangeExclusiveWithLimitBase, metricTypeToColumnName(metricType), "data")
);
dataByDateRangeExclusiveASC[metricType.getCode()][POS_OF_OLD_DATA] = session.prepare(
String.format(dataByDateRangeExclusiveASCBase, metricTypeToColumnName(metricType), "data")
);
dataByDateRangeExclusiveWithLimitASC[metricType.getCode()][POS_OF_OLD_DATA] = session.prepare(
String.format(dataByDateRangeExclusiveWithLimitASCBase, metricTypeToColumnName(metricType), "data")
);
}

// MetricDefinition statements
for(int i = 0; i < NUMBER_OF_TEMP_TABLES; i++) {
String tempTableName = String.format(TEMP_TABLE_NAME_FORMAT, i);
metricsInDatas[i] = session.prepare(String.format(findMetricInDataBase, tempTableName));
allMetricsInDatas[i] = session.prepare(String.format(findAllMetricsInDataBases, tempTableName));
scanTempTableWithTokens[i] = session.prepare(String.format(scanTableBase, tempTableName));
}
metricsInDatas[POS_OF_OLD_DATA] = session.prepare(String.format(findMetricInDataBase, "data"));
allMetricsInDatas[POS_OF_OLD_DATA] = session.prepare(String.format(findAllMetricsInDataBases, "data"));
// dateRangeExclusive = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES+1];
// dateRangeExclusiveWithLimit = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES+1];
// dataByDateRangeExclusiveASC = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES+1];
// dataByDateRangeExclusiveWithLimitASC = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES+1];
// scanTempTableWithTokens = new PreparedStatement[NUMBER_OF_TEMP_TABLES];
//
// // Insert statements
// dataTable = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES];
// dataWithTagsTable = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES];
//
// // MetricDefinition statements
// metricsInDatas = new PreparedStatement[NUMBER_OF_TEMP_TABLES+1];
// allMetricsInDatas = new PreparedStatement[NUMBER_OF_TEMP_TABLES+1];
//
// // Initialize all the temporary tables for inserts
// for (MetricType<?> metricType : MetricType.all()) {
// for(int k = 0; k < NUMBER_OF_TEMP_TABLES; k++) {
// // String metrics are not yet supported with temp tables
// if(metricType.getCode() == 4) {
// continue;
// }
//
// String tempTableName = String.format(TEMP_TABLE_NAME_FORMAT, k);
//
// // Insert statements
// dataTable[metricType.getCode()][k] = session.prepare(
// String.format(data, tempTableName, metricTypeToColumnName(metricType))
// );
// dataWithTagsTable[metricType.getCode()][k] = session.prepare(
// String.format(dataWithTags, tempTableName, metricTypeToColumnName(metricType))
// );
// // Read statements
// dateRangeExclusive[metricType.getCode()][k] = session.prepare(
// String.format(byDateRangeExclusiveBase, metricTypeToColumnName(metricType), tempTableName)
// );
// dateRangeExclusiveWithLimit[metricType.getCode()][k] = session.prepare(
// String.format(dateRangeExclusiveWithLimitBase, metricTypeToColumnName(metricType), tempTableName)
// );
// dataByDateRangeExclusiveASC[metricType.getCode()][k] = session.prepare(
// String.format(dataByDateRangeExclusiveASCBase, metricTypeToColumnName(metricType), tempTableName)
// );
// dataByDateRangeExclusiveWithLimitASC[metricType.getCode()][k] = session.prepare(
// String.format(dataByDateRangeExclusiveWithLimitASCBase, metricTypeToColumnName(metricType), tempTableName)
// );
// }
// // Then initialize the old fashion ones..
// dateRangeExclusive[metricType.getCode()][POS_OF_OLD_DATA] = session.prepare(
// String.format(byDateRangeExclusiveBase, metricTypeToColumnName(metricType), "data")
// );
// dateRangeExclusiveWithLimit[metricType.getCode()][POS_OF_OLD_DATA] = session.prepare(
// String.format(dateRangeExclusiveWithLimitBase, metricTypeToColumnName(metricType), "data")
// );
// dataByDateRangeExclusiveASC[metricType.getCode()][POS_OF_OLD_DATA] = session.prepare(
// String.format(dataByDateRangeExclusiveASCBase, metricTypeToColumnName(metricType), "data")
// );
// dataByDateRangeExclusiveWithLimitASC[metricType.getCode()][POS_OF_OLD_DATA] = session.prepare(
// String.format(dataByDateRangeExclusiveWithLimitASCBase, metricTypeToColumnName(metricType), "data")
// );
// }
//
// // MetricDefinition statements
// for(int i = 0; i < NUMBER_OF_TEMP_TABLES; i++) {
// String tempTableName = String.format(TEMP_TABLE_NAME_FORMAT, i);
// metricsInDatas[i] = session.prepare(String.format(findMetricInDataBase, tempTableName));
// allMetricsInDatas[i] = session.prepare(String.format(findAllMetricsInDataBases, tempTableName));
// scanTempTableWithTokens[i] = session.prepare(String.format(scanTableBase, tempTableName));
// }
// metricsInDatas[POS_OF_OLD_DATA] = session.prepare(String.format(findMetricInDataBase, "data"));
// allMetricsInDatas[POS_OF_OLD_DATA] = session.prepare(String.format(findAllMetricsInDataBases, "data"));
}

private String metricTypeToColumnName(MetricType<?> type) {
Expand Down Expand Up @@ -987,6 +1008,13 @@ public int getBucketIndex(long timestamp) {
return Math.floorDiv(hour, 2);
}

PreparedStatement getTempStatement(MetricType type, TempStatement ts, long timestamp) {
return prepMap
.floorEntry(timestamp)
.getValue()
.get(getMapKey(type.getCode(), ts.ordinal()));
}

@Override
public <T> Observable<Integer> insertData(Observable<Metric<T>> metrics) {
return metrics
Expand All @@ -1003,16 +1031,32 @@ private <T> Observable.Transformer<DataPoint<T>, BoundStatement> mapTempInsertSt
/*
TODO If the NavigableMap returns a null (no matching temp table exists anymore) then the insert is too far
behind (and then we should decide what to do with it.. write to data table or just ignore / throw error ?
For now we'll ignore it and continue with the next
*/

return tO -> tO
.map(dataPoint -> {
BoundStatement bs;
int i = 1;
if (dataPoint.getTags().isEmpty()) {
bs = dataTable[type.getCode()][getBucketIndex(dataPoint.getTimestamp())].bind();
PreparedStatement st =
getTempStatement(type, TempStatement.INSERT_DATA, dataPoint.getTimestamp());

if(st == null) {
return null;
}

bs = st.bind();
} else {
bs = dataWithTagsTable[type.getCode()][getBucketIndex(dataPoint.getTimestamp())].bind();
PreparedStatement st =
getTempStatement(type, TempStatement.INSERT_DATA_WITH_TAGS, dataPoint.getTimestamp());

if(st == null) {
return null;
}

bs = st.bind();
bs.setMap(1, dataPoint.getTags());
i++;
}
Expand All @@ -1023,7 +1067,8 @@ TODO If the NavigableMap returns a null (no matching temp table exists anymore)
.setString(++i, metricId.getName())
.setLong(++i, DPART)
.setTimestamp(++i, new Date(dataPoint.getTimestamp()));
});
})
.filter(Objects::nonNull);
}

private <T> void bindValue(BoundStatement bs, MetricType<T> type, DataPoint<T> dataPoint) {
Expand Down Expand Up @@ -1188,12 +1233,23 @@ private Integer[] tempBuckets(long startTime, long endTime, Order order) {
@Override
public <T> Observable<Row> findTempData(MetricId<T> id, long startTime, long endTime, int limit, Order order,
int pageSize) {
// Find out which tables we'll need to scan
Integer[] buckets = tempBuckets(startTime, endTime, order);
MetricType<T> type = id.getType();
// Find out which tables we'll need to scan
// Integer[] buckets = tempBuckets(startTime, endTime, order);

Long startKey = prepMap.floorKey(startTime);
Long endKey = prepMap.floorKey(endTime);

SortedMap<Long, Map<Integer, PreparedStatement>> statementMap = prepMap.subMap(startKey, endKey);
Observable<Map<Integer, PreparedStatement>> buckets = Observable.from(statementMap.values());

if (order == Order.ASC) {
if (limit <= 0) {
buckets
.map(m -> m.get(getMapKey(type.getCode(), TempStatement.dataByDateRangeExclusiveASC.ordinal())))
.concatMap(p -> rxSession.execute(p
.bind(id.getTenantId(), id.getType().getCode(), id.getName(), DPART,
new Date(startTime), new Date(endTime)).setFetchSize(pageSize)));
return Observable.from(buckets)
.concatMap(i -> rxSession.executeAndFetch(dataByDateRangeExclusiveASC[type.getCode()][i]
.bind(id.getTenantId(), id.getType().getCode(), id.getName(), DPART,
Expand Down

0 comments on commit 53561f7

Please sign in to comment.