Skip to content

Commit

Permalink
Remove old_data idea for now, migrate all the functions to datetime-t…
Browse files Browse the repository at this point in the history
…ables
  • Loading branch information
burmanm committed Jul 4, 2017
1 parent 53561f7 commit 2922cf3
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,14 @@ public interface DataAccess {

<T> Observable<Row> findMetricsInMetricsIndex(String tenantId, MetricType<T> type);

Observable<Row> findMetricsInTempTable(long timestamp);

Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize);

/*
https://issues.apache.org/jira/browse/CASSANDRA-11143
https://issues.apache.org/jira/browse/CASSANDRA-10699
https://issues.apache.org/jira/browse/CASSANDRA-9424
*/
Completable resetTempTable(long timestamp);
// Completable resetTempTable(long timestamp);

Observable<Row> findAllMetricsInData();

Expand All @@ -93,8 +91,8 @@ Observable<Row> findCompressedData(MetricId<?> id, long startTime, long endTime,
<T> Observable<Row> findTempData(MetricId<T> id, long startTime, long endTime, int limit, Order order,
int pageSize);

<T> Observable<Row> findOldData(MetricId<T> id, long startTime, long endTime, int limit, Order order,
int pageSize);
// <T> Observable<Row> findOldData(MetricId<T> id, long startTime, long endTime, int limit, Order order,
// int pageSize);

Observable<Row> findStringData(MetricId<String> id, long startTime, long endTime, int limit, Order order,
int pageSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import rx.Completable;
import rx.Observable;
import rx.exceptions.Exceptions;
import rx.functions.Func2;

/**
*
Expand All @@ -102,6 +103,7 @@ public class DataAccessImpl implements DataAccess {
public static final String TEMP_TABLE_NAME_FORMAT_STRING = TEMP_TABLE_PROTOTYPE + "%s";
public static final int NUMBER_OF_TEMP_TABLES = 12;
public static final int POS_OF_OLD_DATA = NUMBER_OF_TEMP_TABLES;

public static final long DPART = 0;
private Session session;

Expand All @@ -113,6 +115,9 @@ public class DataAccessImpl implements DataAccess {
// See getMapKey(byte, int)
private NavigableMap<Long, Map<Integer, PreparedStatement>> prepMap;

// TODO Move all of these to a new class (Cassandra specific temp table) to allow multiple implementations (such
// as in-memory + WAL in Cassandra)

private enum StatementType {
READ, WRITE, SCAN, CREATE
}
Expand Down Expand Up @@ -368,6 +373,7 @@ private Integer getMapKey(byte code, int ordinal) {
}

private void prepareTempStatements(Map<Integer, PreparedStatement> statementMap, String tableName) {
// Per metricType
for (MetricType<?> metricType : MetricType.all()) {
for (TempStatement st : TempStatement.values()) {
Integer key = getMapKey(metricType.getCode(), st.ordinal());
Expand All @@ -382,12 +388,6 @@ private void prepareTempStatements(Map<Integer, PreparedStatement> statementMap,
formatSt = String.format(st.getStatement(), tableName,
metricTypeToColumnName(metricType));
break;
case SCAN:
formatSt = String.format(st.getStatement(), tableName);
break;
// case CREATE:
// formatSt = String.format(st.getStatement(), tableName);
// break;
default:
// Not supported
continue;
Expand All @@ -397,6 +397,23 @@ private void prepareTempStatements(Map<Integer, PreparedStatement> statementMap,
statementMap.put(key, prepared);
}
}
// Untyped
for (TempStatement st : TempStatement.values()) {
Integer key = getMapKey(MetricType.UNDEFINED.getCode(), st.ordinal());
String formatSt;
switch(st.getType()) {
case SCAN:
formatSt = String.format(st.getStatement(), tableName);
break;
case CREATE:
formatSt = String.format(st.getStatement(), tableName);
break;
default:
continue;
}
PreparedStatement prepared = session.prepare(formatSt);
statementMap.put(key, prepared);
}
}

Observable<ResultSet> createTemporaryTable(long timestamp) {
Expand Down Expand Up @@ -776,7 +793,7 @@ public <T> ResultSetFuture insertMetricInMetricsIndex(Metric<T> metric, boolean

@Override
public <T> Observable<Row> findMetricInData(MetricId<T> id) {
return Observable.from(metricsInDatas)
return getPrepForAllTempTablesWithoutType(TempStatement.CHECK_EXISTENCE_OF_METRIC_IN_TABLE)
.map(b -> b.bind(id.getTenantId(), id.getType().getCode(), id.getName(), DPART))
.flatMap(b -> rxSession.executeAndFetch(b))
.concatWith(rxSession.executeAndFetch(findMetricInData
Expand Down Expand Up @@ -861,12 +878,6 @@ public <T> Observable<Row> findMetricsInMetricsIndex(String tenantId, MetricType
return rxSession.executeAndFetch(readMetricsIndex.bind(tenantId, type.getCode()));
}

@Override
public Observable<Row> findMetricsInTempTable(long timestamp) {
int bucket = getBucketIndex(timestamp);
return rxSession.executeAndFetch(allMetricsInDatas[bucket].bind());
}

/**
* Fetch all the data from a temporary table for the compression job. Using TokenRanges avoids fetching first
* all the metrics' partition keys and then requesting them.
Expand All @@ -878,13 +889,15 @@ public Observable<Row> findMetricsInTempTable(long timestamp) {
*/
@Override
public Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize) {
int bucket = getBucketIndex(timestamp);
// int bucket = getBucketIndex(timestamp);

return Observable.from(getTokenRanges())
.map(tr -> rxSession.executeAndFetch(
scanTempTableWithTokens[bucket].bind()
getTempStatement(MetricType.UNDEFINED, TempStatement.SCAN_WITH_TOKEN_RANGES, timestamp)
.bind()
.setToken(0, tr.getStart())
.setToken(1, tr.getEnd()).setFetchSize(pageSize)));
.setToken(1, tr.getEnd())
.setFetchSize(pageSize)));
}

private Set<TokenRange> getTokenRanges() {
Expand All @@ -900,36 +913,50 @@ private Set<TokenRange> getTokenRanges() {
https://issues.apache.org/jira/browse/CASSANDRA-10699
https://issues.apache.org/jira/browse/CASSANDRA-9424
*/
@Override
public Completable resetTempTable(long timestamp) {
String fullTableName = String.format(TEMP_TABLE_NAME_FORMAT, getBucketIndex(timestamp));

return Completable.fromAction(() -> {
String reCreateCQL = metadata.getKeyspace(session.getLoggedKeyspace())
.getTable(fullTableName)
.asCQLQuery();

String dropCQL = String.format("DROP TABLE %s", fullTableName);

session.execute(dropCQL);
while(!session.getCluster().getMetadata().checkSchemaAgreement()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
session.execute(reCreateCQL);
});
// TODO Needs to reprepare all the preparedstatements after dropping..
// @Override
// public Completable resetTempTable(long timestamp) {
// String fullTableName = String.format(TEMP_TABLE_NAME_FORMAT, getBucketIndex(timestamp));
//
// return Completable.fromAction(() -> {
// String reCreateCQL = metadata.getKeyspace(session.getLoggedKeyspace())
// .getTable(fullTableName)
// .asCQLQuery();
//
// String dropCQL = String.format("DROP TABLE %s", fullTableName);
//
// session.execute(dropCQL);
// while(!session.getCluster().getMetadata().checkSchemaAgreement()) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
// session.execute(reCreateCQL);
// });
// // TODO Needs to reprepare all the preparedstatements after dropping..
//
//// return Completable.fromObservable(rxSession.execute(dropCQL))
//// .andThen(Completable.fromObservable(rxSession.execute(reCreateCQL)));
// }

private Observable<PreparedStatement> getPrepForAllTempTablesWithoutType(TempStatement ts) {
return Observable.from(prepMap.entrySet())
.map(Map.Entry::getValue)
.map(pMap -> pMap.get(getMapKey(MetricType.UNDEFINED.getCode(), ts.ordinal())));

}

// return Completable.fromObservable(rxSession.execute(dropCQL))
// .andThen(Completable.fromObservable(rxSession.execute(reCreateCQL)));
private Observable<PreparedStatement> getPrepForAllTempTablesAndTypes(TempStatement ts) {
return Observable.from(prepMap.entrySet())
.map(Map.Entry::getValue)
.zipWith(Observable.from(MetricType.all()),
(pMap, metricType) -> pMap.get(getMapKey(metricType.getCode(), ts.ordinal())));
}

@Override
public Observable<Row> findAllMetricsInData() {
return Observable.from(allMetricsInDatas)
return getPrepForAllTempTablesAndTypes(TempStatement.LIST_ALL_METRICS_FROM_TABLE)
.map(PreparedStatement::bind)
.flatMap(b -> rxSession.executeAndFetch(b))
.concatWith(
Expand Down Expand Up @@ -1245,67 +1272,70 @@ public <T> Observable<Row> findTempData(MetricId<T> id, long startTime, long end

if (order == Order.ASC) {
if (limit <= 0) {
buckets
return buckets
.map(m -> m.get(getMapKey(type.getCode(), TempStatement.dataByDateRangeExclusiveASC.ordinal())))
.concatMap(p -> rxSession.execute(p
.concatMap(p -> rxSession.executeAndFetch(p
.bind(id.getTenantId(), id.getType().getCode(), id.getName(), DPART,
new Date(startTime), new Date(endTime)).setFetchSize(pageSize)));
return Observable.from(buckets)
.concatMap(i -> rxSession.executeAndFetch(dataByDateRangeExclusiveASC[type.getCode()][i]
.bind(id.getTenantId(), id.getType().getCode(), id.getName(), DPART,
new Date(startTime), new Date(endTime)).setFetchSize(pageSize)));
new Date(startTime), new Date(endTime))
.setFetchSize(pageSize)));
} else {
return Observable.from(buckets)
.concatMap(i -> rxSession.executeAndFetch(dataByDateRangeExclusiveWithLimitASC[type.getCode()][i].bind(
id.getTenantId(), id.getType().getCode(), id.getName(), DPART, new Date(startTime),
new Date(endTime), limit).setFetchSize(pageSize)));
return buckets
.map(m -> m.get(getMapKey(type.getCode(), TempStatement.dataByDateRangeExclusiveWithLimitASC.ordinal())))
.concatMap(p -> rxSession.executeAndFetch(p
.bind(id.getTenantId(), id.getType().getCode(), id.getName(), DPART, new Date(startTime),
new Date(endTime), limit)
.setFetchSize(pageSize)));
}
} else {
if (limit <= 0) {
return Observable.from(buckets)
.concatMap(i -> rxSession.executeAndFetch(dateRangeExclusive[type.getCode()][i].bind(id.getTenantId(),
id.getType().getCode(), id.getName(), DPART, new Date(startTime), new Date(endTime))
return buckets
.map(m -> m.get(getMapKey(type.getCode(), TempStatement.dateRangeExclusive.ordinal())))
.concatMap(p -> rxSession.executeAndFetch(p
.bind(id.getTenantId(), id.getType().getCode(), id.getName(), DPART, new Date(startTime),
new Date(endTime))
.setFetchSize(pageSize)));
} else {
return Observable.from(buckets)
.concatMap(i -> rxSession.executeAndFetch(dateRangeExclusiveWithLimit[type.getCode()][i].bind(id.getTenantId(),
id.getType().getCode(), id.getName(), DPART, new Date(startTime), new Date(endTime),
limit).setFetchSize(pageSize)));
return buckets
.map(m -> m.get(getMapKey(type.getCode(), TempStatement.dateRangeExclusiveWithLimit.ordinal())))
.concatMap(p -> rxSession.executeAndFetch(p
.bind(id.getTenantId(), id.getType().getCode(), id.getName(), DPART, new Date(startTime), new Date(endTime),
limit)
.setFetchSize(pageSize)));
}
}
}

@Override
public <T> Observable<Row> findOldData(MetricId<T> id, long startTime, long endTime, int limit, Order order,
int pageSize) {
MetricType<T> type = id.getType();

if (order == Order.ASC) {
if (limit <= 0) {
return rxSession.executeAndFetch(dataByDateRangeExclusiveASC[type.getCode()][POS_OF_OLD_DATA]
.bind(id.getTenantId(), id.getType().getCode(), id.getName(), DPART,
getTimeUUID(startTime), getTimeUUID(endTime)).setFetchSize(pageSize))
.doOnError(Throwable::printStackTrace);
} else {
return rxSession.executeAndFetch(dataByDateRangeExclusiveWithLimitASC[type.getCode()][POS_OF_OLD_DATA].bind(
id.getTenantId(), id.getType().getCode(), id.getName(), DPART, getTimeUUID(startTime),
getTimeUUID(endTime), limit).setFetchSize(pageSize))
.doOnError(Throwable::printStackTrace);
}
} else {
if (limit <= 0) {
return rxSession.executeAndFetch(dateRangeExclusive[type.getCode()][POS_OF_OLD_DATA].bind(id.getTenantId(),
id.getType().getCode(), id.getName(), DPART, getTimeUUID(startTime), getTimeUUID(endTime))
.setFetchSize(pageSize))
.doOnError(Throwable::printStackTrace);
} else {
return rxSession.executeAndFetch(dateRangeExclusiveWithLimit[type.getCode()][POS_OF_OLD_DATA].bind(
id.getTenantId(), id.getType().getCode(), id.getName(), DPART, getTimeUUID(startTime),
getTimeUUID(endTime), limit).setFetchSize(pageSize))
.doOnError(Throwable::printStackTrace);
}
}
}
// @Override
// public <T> Observable<Row> findOldData(MetricId<T> id, long startTime, long endTime, int limit, Order order,
// int pageSize) {
// MetricType<T> type = id.getType();
//
// if (order == Order.ASC) {
// if (limit <= 0) {
// return rxSession.executeAndFetch(dataByDateRangeExclusiveASC[type.getCode()][POS_OF_OLD_DATA]
// .bind(id.getTenantId(), id.getType().getCode(), id.getName(), DPART,
// getTimeUUID(startTime), getTimeUUID(endTime)).setFetchSize(pageSize))
// .doOnError(Throwable::printStackTrace);
// } else {
// return rxSession.executeAndFetch(dataByDateRangeExclusiveWithLimitASC[type.getCode()][POS_OF_OLD_DATA].bind(
// id.getTenantId(), id.getType().getCode(), id.getName(), DPART, getTimeUUID(startTime),
// getTimeUUID(endTime), limit).setFetchSize(pageSize))
// .doOnError(Throwable::printStackTrace);
// }
// } else {
// if (limit <= 0) {
// return rxSession.executeAndFetch(dateRangeExclusive[type.getCode()][POS_OF_OLD_DATA].bind(id.getTenantId(),
// id.getType().getCode(), id.getName(), DPART, getTimeUUID(startTime), getTimeUUID(endTime))
// .setFetchSize(pageSize))
// .doOnError(Throwable::printStackTrace);
// } else {
// return rxSession.executeAndFetch(dateRangeExclusiveWithLimit[type.getCode()][POS_OF_OLD_DATA].bind(
// id.getTenantId(), id.getType().getCode(), id.getName(), DPART, getTimeUUID(startTime),
// getTimeUUID(endTime), limit).setFetchSize(pageSize))
// .doOnError(Throwable::printStackTrace);
// }
// }
// }

@Override
public Observable<Row> findStringData(MetricId<String> id, long startTime, long endTime, int limit, Order order,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,8 @@ public <T> Observable<DataPoint<T>> findDataPoints(MetricId<T> metricId, long st
Func1<Row, DataPoint<T>> tempMapper = (Func1<Row, DataPoint<T>>) tempDataPointMappers.get(metricType);

// Calls mostly deprecated methods..
Observable<DataPoint<T>> uncompressedPoints = dataAccess.findOldData(metricId, start, end, limit, safeOrder,
pageSize).map(mapper).doOnError(Throwable::printStackTrace);
// Observable<DataPoint<T>> uncompressedPoints = dataAccess.findOldData(metricId, start, end, limit, safeOrder,
// pageSize).map(mapper).doOnError(Throwable::printStackTrace);

Observable<DataPoint<T>> compressedPoints =
dataAccess.findCompressedData(metricId, sliceStart, end, limit, safeOrder)
Expand All @@ -666,16 +666,16 @@ public <T> Observable<DataPoint<T>> findDataPoints(MetricId<T> metricId, long st

Comparator<DataPoint<T>> comparator = getDataPointComparator(safeOrder);
List<Observable<? extends DataPoint<T>>> sources = new ArrayList<>(3);
sources.add(uncompressedPoints);
// sources.add(uncompressedPoints);
sources.add(compressedPoints);
sources.add(tempStoragePoints);

// TODO This should be pluggable storage .. where we could do the queries as well. Just make it use
// dataAccess in this Cassandra temporary table solution
// Write an interface that allows all the necessary queries (mm.. kinda like MetricsService then I
// guess.. iiks)

Observable<DataPoint<T>> dataPoints = SortedMerge.create(Arrays.asList(uncompressedPoints,
compressedPoints, tempStoragePoints), comparator, false)
Observable<DataPoint<T>> dataPoints = SortedMerge.create(sources, comparator, false)
.distinctUntilChanged(
(tDataPoint, tDataPoint2) -> comparator.compare(tDataPoint, tDataPoint2) == 0);

Expand Down

0 comments on commit 2922cf3

Please sign in to comment.