Skip to content

Commit

Permalink
Reimplement compression job, uses tokenRange scanning now to avoid ex…
Browse files Browse the repository at this point in the history
…tra Cassandra calls
  • Loading branch information
burmanm committed Jul 4, 2017
1 parent 396e0e4 commit 9fa4ab6
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ public class TempDataCompressor implements Func1<JobDetails, Completable> {
private static Logger logger = Logger.getLogger(TempDataCompressor.class);

public static final String JOB_NAME = "TEMP_DATA_COMPRESSOR";
public static final String BLOCK_SIZE = "compression.block.size";
public static final String TARGET_TIME = "compression.time.target";
public static final String CONFIG_ID = "org.hawkular.metrics.jobs." + JOB_NAME;

private static final int DEFAULT_PAGE_SIZE = 1000;
Expand Down Expand Up @@ -99,7 +97,7 @@ public Completable call(JobDetails jobDetails) {
Stopwatch stopwatch = Stopwatch.createStarted();
logger.infof("Starting to process temp tables for starting time of %d", startOfSlice);

return metricsService.compressBlock(startOfSlice)
return metricsService.compressBlock(startOfSlice, pageSize)
.doOnCompleted(() ->
logger.info("Finished processing data in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) +
" ms")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;

import rx.Completable;
import rx.Observable;

/**
Expand Down Expand Up @@ -68,6 +69,13 @@ public interface DataAccess {

Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize);

/*
https://issues.apache.org/jira/browse/CASSANDRA-11143
https://issues.apache.org/jira/browse/CASSANDRA-10699
https://issues.apache.org/jira/browse/CASSANDRA-9424
*/
Completable resetTempTable(long timestamp);

Observable<Row> findAllMetricsInData();

<T> Observable<Integer> insertData(Observable<Metric<T>> metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ private void prepareTemporaryTableStatements() {
" LIMIT ?";

String scanTableBase =
"SELECT tenant_id, type, metric, time, n_value, availability, l_value, tags FROM %s " +
"SELECT tenant_id, type, metric, time, n_value, availability, l_value, tags, token(tenant_id, type, " +
"metric, dpart) FROM %s " +
"WHERE token(tenant_id, type, metric, dpart) > ? AND token(tenant_id, type, metric, dpart) <=" +
" ?";

Expand Down Expand Up @@ -714,17 +715,31 @@ private Set<TokenRange> getTokenRanges() {
https://issues.apache.org/jira/browse/CASSANDRA-10699
https://issues.apache.org/jira/browse/CASSANDRA-9424
*/
private Completable resetTempTable(int bucketIndex) {
String fullTableName = String.format(TEMP_TABLE_NAME_FORMAT, bucketIndex);

String reCreateCQL = metadata.getKeyspace(session.getLoggedKeyspace())
.getTable(fullTableName)
.asCQLQuery();

String dropCQL = String.format("DROP TABLE %s", fullTableName);
@Override
public Completable resetTempTable(long timestamp) {
String fullTableName = String.format(TEMP_TABLE_NAME_FORMAT, getBucketIndex(timestamp));

return Completable.fromAction(() -> {
String reCreateCQL = metadata.getKeyspace(session.getLoggedKeyspace())
.getTable(fullTableName)
.asCQLQuery();

String dropCQL = String.format("DROP TABLE %s", fullTableName);

session.execute(dropCQL);
while(!session.getCluster().getMetadata().checkSchemaAgreement()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
session.execute(reCreateCQL);
});
// TODO Needs to reprepare all the preparedstatements after dropping..

return Completable.fromObservable(rxSession.execute(dropCQL))
.andThen(Completable.fromObservable(rxSession.execute(reCreateCQL)));
// return Completable.fromObservable(rxSession.execute(dropCQL))
// .andThen(Completable.fromObservable(rxSession.execute(reCreateCQL)));
}

@Override
Expand Down Expand Up @@ -808,6 +823,7 @@ private <T> Observable.Transformer<DataPoint<T>, BoundStatement> mapTempInsertSt
MetricId<T> metricId = metric.getMetricId();
return tO -> tO
.map(dataPoint -> {
log.infof("------------> BucketIndex %d", getBucketIndex(dataPoint.getTimestamp()));
BoundStatement bs;
int i = 1;
if (dataPoint.getTags().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ Observable<Map<String, Set<String>>> getTagValues(String tenantId, MetricType<?>
<T> Observable<DataPoint<T>> findDataPoints(MetricId<T> id, long start, long end, int limit, Order order,
int pageSize);

@SuppressWarnings("unchecked") Completable compressBlock(long startTimeSlice);
@SuppressWarnings("unchecked") Completable compressBlock(long startTimeSlice, int pageSize);

/**
* Compresses the given range between timestamps to a single block.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,19 +736,48 @@ private <T> Observable.Transformer<T, T> applyRetryPolicy() {

@Override
@SuppressWarnings("unchecked")
public Completable compressBlock(long startTimeSlice) {
public Completable compressBlock(long startTimeSlice, int pageSize) {
// TODO Create test that shows this matches the CORRECT bucket always
// TODO Test that this actually fetches everything.. always
return Completable.fromObservable(dataAccess.findAllDataFromBucket(startTimeSlice, defaultPageSize)

return Completable.fromObservable(
Observable.switchOnNext(dataAccess.findAllDataFromBucket(startTimeSlice, pageSize))
.publish(p -> p.window(p.map(Row::getPartitionKeyToken).distinctUntilChanged()))
.concatMap(o -> {
Observable<Row> sharedRows = o.share();
Observable<CompressedPointContainer> compressed = sharedRows.compose(new TempTableCompressTransformer(startTimeSlice));
Observable<Row> keyTake = sharedRows.take(1);

return compressed.zipWith(keyTake, (cpc, r) -> {
MetricId<?> metricId =
new MetricId(r.getString(0), MetricType.fromCode(r.getByte(1)), r.getString(2));
return dataAccess.insertCompressedData(metricId, startTimeSlice, cpc, getTTL(metricId));
});
})
).doOnCompleted(() -> log.infof("Compress part completed"))
.andThen(dataAccess.resetTempTable(startTimeSlice));
}

@SuppressWarnings("unchecked")
public Completable compressBlockSlow(long startTimeSlice, int pageSize) {
// TODO Create test that shows this matches the CORRECT bucket always
// TODO Test that this actually fetches everything.. always
return Completable.fromObservable(dataAccess.findAllDataFromBucket(startTimeSlice, pageSize)
.compose(applyRetryPolicy())
.concatMap(rO -> rO
.groupBy(r ->
new MetricId(r.getString(0), MetricType.fromCode(r.getByte(1)), r.getString(2)))
.doOnNext(g -> log.infof("Starting to process metricId %s\n", g.getKey().toString()))
.concatMap(g ->
g.compose(new TempTableCompressTransformer(g.getKey(), startTimeSlice))
g
.compose(new TempTableCompressTransformer(startTimeSlice))
.doOnNext(cpc -> log.infof("Sending to storage -> %s", g.getKey().toString()))
.concatMap(cpc -> dataAccess.insertCompressedData(g.getKey(), startTimeSlice,
(CompressedPointContainer) cpc, getTTL(g.getKey())))
)));
))

).doOnCompleted(() -> log.infof("Compress part completed"))
.andThen(dataAccess.resetTempTable(startTimeSlice));

// "SELECT tenant_id, type, metric, time, n_value, availability, l_value, tags FROM %s " +

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public <E extends Enum<E> & CompressorSetting> Class getSettingsClass() {
}

public enum GorillaSettings implements CompressorSetting {
SECOND_PRECISION((byte) 0x01);
SECOND_PRECISION((byte) 0x01), LONG_VALUES((byte) 0x02);

private byte value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.hawkular.metrics.core.service.compress.CompressedPointContainer;
import org.hawkular.metrics.core.service.compress.CompressorHeader;
import org.hawkular.metrics.core.service.compress.TagsSerializer;
import org.hawkular.metrics.model.MetricId;
import org.hawkular.metrics.model.MetricType;

import com.datastax.driver.core.Row;

Expand All @@ -36,52 +34,80 @@
*
* @author Michael Burman
*/
public class TempTableCompressTransformer<T> implements Observable.Transformer<Row, CompressedPointContainer> {
public class TempTableCompressTransformer implements Observable.Transformer<Row, CompressedPointContainer> {

private ByteBufferBitOutput out;
private long sliceTimestamp;
private MetricType<T> metricType;
private Compressor compressor;
private TagsSerializer tagsSerializer;
private long timeslice;

public TempTableCompressTransformer(MetricId<T> key, long timeslice) {
out = new ByteBufferBitOutput();
this.metricType = key.getType();
public TempTableCompressTransformer(long timeslice) {
this.timeslice = timeslice;
}

@Override
public Observable<CompressedPointContainer> call(Observable<Row> dataRow) {

// TODO Calculate some relevant statistics of the compressed block?
// Does not really suit the reactive model.. might need two iterations of the data

/*
For values:
Calculate the following statistics:
1. Is the data integers (Gauges can be integers) ?
1.1 If, is the max value <= Long.MAX_VALUE
2. How many values
4. If floating points
4.1 Amount of unique values
4.2 Distribution of the values
=> Select correct predictor (last-value / DFCM)
4.3 Value splitting capability (ISOBAR stuff) ?
=> Proper column split for better compress ratio
=> Gorilla encoding or some other?
4.4 Can we use lossy compression?
=> zfp etc
5. If integers
5.1 Are the values in sorted order?
=> Delta compression
5.2 Repetition of values
=> RLE or let LZ4 do the compression?
5.3 Exception rate .. PFOR or Simple-8 for example
For timestamps we might need another approach..
1. Delta and delta-of-delta distribution
=> Which one to use (probably DoD in monitoring case)
2.
*/
ByteBufferBitOutput out = new ByteBufferBitOutput();

// Write the appropriate header, at first we're stuck to Gorilla only
byte gorillaHeader = CompressorHeader.getHeader(CompressorHeader.Compressor.GORILLA, EnumSet.noneOf
(CompressorHeader.GorillaSettings.class));
out.getByteBuffer().put(gorillaHeader);

this.sliceTimestamp = timeslice;
this.compressor = new Compressor(timeslice, out);
this.tagsSerializer = new TagsSerializer(timeslice);
}
Compressor compressor = new Compressor(timeslice, out);
TagsSerializer tagsSerializer = new TagsSerializer(timeslice);

@Override
public Observable<CompressedPointContainer> call(Observable<Row> dataRow) {
return dataRow.collect(CompressedPointContainer::new,
(container, r) -> {
// "SELECT tenant_id, type, metric, time, n_value, availability, l_value, tags FROM %s " +
long timestamp = r.getTimestamp(3).getTime(); // Check validity
switch(metricType.getCode()) {
switch(r.getByte(1)) {
case 0: // GAUGE
compressor.addValue(timestamp, r.getDouble(4));
break;
case 1: // AVAILABILITY
// TODO Update to newer Gorilla-TSC to fix these - no point storing as FP
// TODO Update to GORILLA_V2 to fix these - no point storing as FP
compressor.addValue(timestamp, ((Byte) r.getByte(4)).doubleValue());
break;
case 2: // COUNTER
// TODO Update to newer Gorilla-TSC to fix these - no point storing as FP
// TODO Update to GORILLA_V2 to fix these - no point storing as FP
compressor.addValue(timestamp, ((Long) r.getLong(4)).doubleValue());
break;
default:
// Not supported yet
throw new RuntimeException("Metric of type " + metricType.getText() + " is not supported " +
"in compression");
throw new RuntimeException("Metric of type " + r.getByte(1) + " is not supported" +
" in compression");
}

// TODO Fix Tags storage!
// if(d.getTags() != null && !d.getTags().isEmpty()) {
// tagsSerializer.addDataPointTags(d.getTimestamp(), d.getTags());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;

import rx.Completable;
import rx.Observable;

/**
Expand Down Expand Up @@ -120,6 +121,10 @@ public Observable<Row> findMetricsInTempTable(long timestamp) {
return delegate.findAllDataFromBucket(timestamp, pageSize);
}

@Override public Completable resetTempTable(long timestamp) {
return delegate.resetTempTable(timestamp);
}

@Override
public Observable<Row> findAllMetricsInData() {
return delegate.findAllMetricsInData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.hawkular.metrics.model.Tenant;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -351,7 +352,9 @@ public void addAndFetchGaugeDataAggregates() throws Exception {

@Test
public void addAndCompressData() throws Exception {
DateTime dt = new DateTime(2016, 9, 2, 14, 15); // Causes writes to go to compressed and one uncompressed row
// TODO This unit test works even if compression is broken.. as long as the data is not deleted
DateTime dt = new DateTime(2016, 9, 2, 14, 15, DateTimeZone.UTC); // Causes writes to go to compressed and one
// uncompressed table
DateTimeUtils.setCurrentMillisFixed(dt.getMillis());

DateTime start = dt.minusMinutes(30);
Expand All @@ -371,11 +374,15 @@ public void addAndCompressData() throws Exception {
insertObservable.toBlocking().lastOrDefault(null);

DateTime startSlice = DateTimeService.getTimeSlice(start, CompressData.DEFAULT_BLOCK_SIZE);
DateTime endSlice = startSlice.plus(CompressData.DEFAULT_BLOCK_SIZE);
// DateTime endSlice = startSlice.plus(CompressData.DEFAULT_BLOCK_SIZE);

System.out.printf("===================> Processing %d compressing %d\n", start.getMillis(), startSlice.getMillis());

Completable compressCompletable =
metricsService.compressBlock(Observable.just(mId), startSlice.getMillis(), endSlice.getMillis(),
COMPRESSION_PAGE_SIZE, PublishSubject.create()).doOnError(Throwable::printStackTrace);
metricsService.compressBlock(startSlice.getMillis(), COMPRESSION_PAGE_SIZE)
.doOnError(Throwable::printStackTrace);
// metricsService.compressBlock(Observable.just(mId), startSlice.getMillis(), endSlice.getMillis(),
// COMPRESSION_PAGE_SIZE, PublishSubject.create()).doOnError(Throwable::printStackTrace);

TestSubscriber<Void> testSubscriber = new TestSubscriber<>();
compressCompletable.subscribe(testSubscriber);
Expand Down
13 changes: 7 additions & 6 deletions core/rx-java-driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${version.io.netty}</version>
<classifier>linux-x86_64</classifier>
</dependency>
<!--<dependency>-->
<!--<groupId>io.netty</groupId>-->
<!--<artifactId>netty-transport-native-epoll</artifactId>-->
<!--<version>${version.io.netty}</version>-->
<!--<classifier>linux-x86_64</classifier>-->
<!--</dependency>-->
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
<version.org.jboss.logging.jboss-logging-tools>2.0.1.Final</version.org.jboss.logging.jboss-logging-tools>
<version.org.infinispan.wildfly>8.0.1.Final</version.org.infinispan.wildfly>
<version.org.slf4j>1.7.7</version.org.slf4j>
<version.io.reactivex.rxjava>1.2.9</version.io.reactivex.rxjava>
<version.io.reactivex.rxjava>1.3.0</version.io.reactivex.rxjava>
<version.io.reactivex.rxjava-math>1.0.0</version.io.reactivex.rxjava-math>
<version.io.reactivex.rxjava-guava>1.0.3</version.io.reactivex.rxjava-guava>
<version.org.codehaus.mojo.findbugs-maven-plugin>3.0.0</version.org.codehaus.mojo.findbugs-maven-plugin>
Expand Down

0 comments on commit 9fa4ab6

Please sign in to comment.