Skip to content

Commit

Permalink
HWKMETRICS-784 (#954)
Browse files Browse the repository at this point in the history
* compressBlock will check for all possible blocks to compress

* Take the input time from the job, fixes tests

Change the DataAccessITest to use a single version of time

* Add requested logging

* Refactor slightly.. add target subject error propagation

* flatMap -> concatMap
  • Loading branch information
John Sanda committed Apr 20, 2018
1 parent ea834ea commit 563c336
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2018 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -89,6 +89,7 @@ public Completable call(JobDetails jobDetails) {

// TODO Optimization - new worker per token - use parallelism in Cassandra (with configured parallelism)
return metricsService.compressBlock(startOfSlice, pageSize, maxReadConcurrency)
.doOnError(t -> logger.errorf("Compression job failed: %s", t.getMessage()))
.doOnCompleted(() -> {
stopwatch.stop();
logger.info("Finished processing data in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public interface DataAccess {
*/
// Completable resetTempTable(long timestamp);

Set<Long> findExpiredTables(long startTime);

Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize, int maxConcurrency);

Observable<ResultSet> dropTempTable(long timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,20 @@ public <T> Observable<Row> findMetricsInMetricsIndex(String tenantId, MetricType
return rxSession.executeAndFetch(readMetricsIndex.bind(tenantId, type.getCode()));
}

@Override
public Set<Long> findExpiredTables(long startTime) {
Long currentTableKey = prepMap.floorKey(startTime);
NavigableMap<Long, Map<Integer, PreparedStatement>> expiredTempMap =
prepMap.subMap(0L, false, currentTableKey, true);
if(log.isDebugEnabled()) {
log.debug(String.format("Returning expired keys from %d to %d, in total %d items",
expiredTempMap.firstEntry().getKey(),
expiredTempMap.lastEntry().getKey(),
expiredTempMap.size())); // No idea why debugf throws compiler issues
}
return expiredTempMap.keySet();
}

/**
* Fetch all the data from a temporary table for the compression job. Using TokenRanges avoids fetching first
* all the metrics' partition keys and then requesting them.
Expand All @@ -810,7 +824,7 @@ public Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pag

return Observable.from(getTokenRanges())
.map(tr -> rxSession.executeAndFetch(
getTempStatement(MetricType.UNDEFINED, TempStatement.SCAN_WITH_TOKEN_RANGES, timestamp)
ts
.bind()
.setToken(0, tr.getStart())
.setToken(1, tr.getEnd())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,38 +764,41 @@ public Completable verifyAndCreateTempTables(ZonedDateTime startTime, ZonedDateT

@Override
@SuppressWarnings("unchecked")
public Completable compressBlock(long startTimeSlice, int pageSize, int maxConcurrency) {
public Completable compressBlock(long jobStartTimeSlice, int pageSize, int maxConcurrency) {
return Completable.fromObservable(
dataAccess.findAllDataFromBucket(startTimeSlice, pageSize, maxConcurrency)
.switchIfEmpty(Observable.empty())
.flatMap(rows -> rows
// Each time the tokenrange changes inside the query, create new window, publish allows
// reuse of the observable in two distinct processing phases
.publish(p -> p.window(
p.map(Row::getPartitionKeyToken)
.distinctUntilChanged()))
// ConcatMap so we don't mess the order as that's important in the compression job
.concatMap(o -> {
// Cache the first key from the observable so we can use it to create a key later
Observable<Row> sharedRows = o.share();
Observable<CompressedPointContainer> compressed =
sharedRows.compose(new TempTableCompressTransformer(startTimeSlice));
Observable<Row> keyTake = sharedRows.take(1);

// Merge the first row with the compressed package to be able to write to Cassandra
return compressed.zipWith(keyTake, (cpc, r) -> {
MetricId<?> metricId =
new MetricId(r.getString(0), MetricType.fromCode(r.getByte(1)),
r.getString(2));
return dataAccess.insertCompressedData(metricId, startTimeSlice, cpc,
getTTL(metricId));
});
}), maxConcurrency)
.flatMap(rs -> rs)
.doOnCompleted(() -> dataAccess.dropTempTable(startTimeSlice)
.compose(applyRetryPolicy())
.subscribeOn(Schedulers.io())
.subscribe())
Observable.from(dataAccess.findExpiredTables(jobStartTimeSlice))
.concatMap(startTimeSlice ->
dataAccess.findAllDataFromBucket(startTimeSlice, pageSize, maxConcurrency)
.switchIfEmpty(Observable.empty())
.flatMap(rows -> rows
// Each time the tokenrange changes inside the query, create new window, publish allows
// reuse of the observable in two distinct processing phases
.publish(p -> p.window(
p.map(Row::getPartitionKeyToken)
.distinctUntilChanged()))
// ConcatMap so we don't mess the order as that's important in the compression job
.concatMap(o -> {
// Cache the first key from the observable so we can use it to create a key later
Observable<Row> sharedRows = o.share();
Observable<CompressedPointContainer> compressed =
sharedRows.compose(new TempTableCompressTransformer(startTimeSlice));
Observable<Row> keyTake = sharedRows.take(1);

// Merge the first row with the compressed package to be able to write to Cassandra
return compressed.zipWith(keyTake, (cpc, r) -> {
MetricId<?> metricId =
new MetricId(r.getString(0), MetricType.fromCode(r.getByte(1)),
r.getString(2));
return dataAccess.insertCompressedData(metricId, startTimeSlice, cpc,
getTTL(metricId));
});
}), maxConcurrency)
.flatMap(rs -> rs)
.doOnCompleted(() -> dataAccess.dropTempTable(startTimeSlice)
.compose(applyRetryPolicy())
.subscribeOn(Schedulers.io())
.subscribe())
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.hawkular.metrics.model.AvailabilityType.UP;
import static org.hawkular.metrics.model.MetricType.AVAILABILITY;
import static org.hawkular.metrics.model.MetricType.GAUGE;
import static org.joda.time.DateTime.now;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;

Expand All @@ -31,6 +30,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.hawkular.metrics.core.service.transformers.MetricIdentifierFromFullDataRowTransformer;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.model.AvailabilityType;
import org.hawkular.metrics.model.DataPoint;
import org.hawkular.metrics.model.Metric;
Expand Down Expand Up @@ -63,6 +63,7 @@ public class DataAccessITest extends BaseITest {
private static int DEFAULT_PAGE_SIZE = 5000;

private DataAccessImpl dataAccess;
private static final DateTime now = DateTimeService.now.get();

private PreparedStatement truncateTenants;
private PreparedStatement truncateGaugeData;
Expand All @@ -71,7 +72,7 @@ public class DataAccessITest extends BaseITest {

@BeforeClass
public void initClass() {
this.dataAccess = (DataAccessImpl) TestDataAccessFactory.newInstance(session);
this.dataAccess = (DataAccessImpl) TestDataAccessFactory.newInstance(session, now());

truncateTenants = session.prepare("TRUNCATE tenants");
truncateGaugeData = session.prepare("TRUNCATE data");
Expand Down Expand Up @@ -229,7 +230,11 @@ public void findAllMetricsPartitionKeys() throws Exception {
assertEquals(metrics.size(), 4);
}

@Test(enabled = false)
private static DateTime now() {
return new DateTime(now);
}

@Test
void testFindAllDataFromBucket() throws Exception {
String tenantId = "t1";
long start = now().getMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ public <T> Observable<Row> findMetricsInMetricsIndex(String tenantId, MetricType
return delegate.findMetricsInMetricsIndex(tenantId, type);
}

@Override public Set<Long> findExpiredTables(long startTime) {
return delegate.findExpiredTables(startTime);
}

@Override public Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize, int maxConcurrency) {
return delegate.findAllDataFromBucket(timestamp, pageSize, maxConcurrency);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class TestDataAccessFactory {
private static final CoreLogger log = CoreLogging.getCoreLogger(TestDataAccessFactory.class);

public static DataAccess newInstance(Session session) {
return newInstance(session, DateTimeService.now.get());
}

public static DataAccess newInstance(Session session, DateTime now) {
session.execute(String.format("USE %s", BaseITest.getKeyspace()));
final CountDownLatch latch = new CountDownLatch(3);
final CountDownLatch fallBackTable = new CountDownLatch(0);
Expand All @@ -55,7 +59,7 @@ void prepareTempStatements(String tableName, Long mapKey) {
}
}
};
dataAccess.createTempTablesIfNotExists(tableListForTesting())
dataAccess.createTempTablesIfNotExists(tableListForTesting(now))
.subscribeOn(Schedulers.io())
.toBlocking().subscribe();
try {
Expand All @@ -70,9 +74,8 @@ void prepareTempStatements(String tableName, Long mapKey) {
/**
* Create few temporary tables for tests
*/
static Set<Long> tableListForTesting() {
static Set<Long> tableListForTesting(DateTime now) {
Set<Long> tempTables = new HashSet<>(3);
DateTime now = DateTimeService.now.get();
tempTables.add(now.getMillis());
tempTables.add(now.minusHours(2).getMillis());
tempTables.add(now.plusHours(2).getMillis());
Expand Down

0 comments on commit 563c336

Please sign in to comment.