Skip to content

Commit

Permalink
Travis?
Browse files Browse the repository at this point in the history
  • Loading branch information
burmanm committed Jul 4, 2017
1 parent e0bce48 commit a194b37
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class TempDataCompressor implements Func1<JobDetails, Completable> {
private static Logger logger = Logger.getLogger(TempDataCompressor.class);

public static final String JOB_NAME = "TEMP_DATA_COMPRESSOR";
public static final String CONFIG_ID = JobsServiceImpl.CONFIG_PREFIX + "temp.table.compressor";
public static final String CONFIG_ID = JobsServiceImpl.CONFIG_PREFIX + JOB_NAME;
public static final String CONFIG_PAGE_SIZE = "page-size";
public static final String CONFIG_MAX_READ_CONCURRENCY = "concurrency.read.max";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;

import rx.Completable;
import rx.Observable;

/**
Expand Down Expand Up @@ -77,7 +76,7 @@ public interface DataAccess {

Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize, int maxConcurrency);

Completable dropTempTable(long timestamp);
Observable<ResultSet> dropTempTable(long timestamp);

Observable<Row> findAllMetricsInData();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.policies.LoadBalancingPolicy;

import rx.Completable;
import rx.Observable;
import rx.exceptions.Exceptions;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -540,79 +539,6 @@ private void initializeTemporaryTableStatements() {
}
prepMap.put(0L, statementMap); // Fall back is always at value 0 (floorKey/floorEntry will hit it)
}

// These are for the ring buffer strategy (slightly faster but Cassandra 3.x has issues with consistent schema)

// Read statements
// dateRangeExclusive = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES+1];
// dateRangeExclusiveWithLimit = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES+1];
// dataByDateRangeExclusiveASC = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES+1];
// dataByDateRangeExclusiveWithLimitASC = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES+1];
// scanTempTableWithTokens = new PreparedStatement[NUMBER_OF_TEMP_TABLES];
//
// // Insert statements
// dataTable = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES];
// dataWithTagsTable = new PreparedStatement[MetricType.all().size()][NUMBER_OF_TEMP_TABLES];
//
// // MetricDefinition statements
// metricsInDatas = new PreparedStatement[NUMBER_OF_TEMP_TABLES+1];
// allMetricsInDatas = new PreparedStatement[NUMBER_OF_TEMP_TABLES+1];
//
// // Initialize all the temporary tables for inserts
// for (MetricType<?> metricType : MetricType.all()) {
// for(int k = 0; k < NUMBER_OF_TEMP_TABLES; k++) {
// // String metrics are not yet supported with temp tables
// if(metricType.getCode() == 4) {
// continue;
// }
//
// String tempTableName = String.format(TEMP_TABLE_NAME_FORMAT, k);
//
// // Insert statements
// dataTable[metricType.getCode()][k] = session.prepare(
// String.format(data, tempTableName, metricTypeToColumnName(metricType))
// );
// dataWithTagsTable[metricType.getCode()][k] = session.prepare(
// String.format(dataWithTags, tempTableName, metricTypeToColumnName(metricType))
// );
// // Read statements
// dateRangeExclusive[metricType.getCode()][k] = session.prepare(
// String.format(byDateRangeExclusiveBase, metricTypeToColumnName(metricType), tempTableName)
// );
// dateRangeExclusiveWithLimit[metricType.getCode()][k] = session.prepare(
// String.format(dateRangeExclusiveWithLimitBase, metricTypeToColumnName(metricType), tempTableName)
// );
// dataByDateRangeExclusiveASC[metricType.getCode()][k] = session.prepare(
// String.format(dataByDateRangeExclusiveASCBase, metricTypeToColumnName(metricType), tempTableName)
// );
// dataByDateRangeExclusiveWithLimitASC[metricType.getCode()][k] = session.prepare(
// String.format(dataByDateRangeExclusiveWithLimitASCBase, metricTypeToColumnName(metricType), tempTableName)
// );
// }
// // Then initialize the old fashion ones..
// dateRangeExclusive[metricType.getCode()][POS_OF_OLD_DATA] = session.prepare(
// String.format(byDateRangeExclusiveBase, metricTypeToColumnName(metricType), "data")
// );
// dateRangeExclusiveWithLimit[metricType.getCode()][POS_OF_OLD_DATA] = session.prepare(
// String.format(dateRangeExclusiveWithLimitBase, metricTypeToColumnName(metricType), "data")
// );
// dataByDateRangeExclusiveASC[metricType.getCode()][POS_OF_OLD_DATA] = session.prepare(
// String.format(dataByDateRangeExclusiveASCBase, metricTypeToColumnName(metricType), "data")
// );
// dataByDateRangeExclusiveWithLimitASC[metricType.getCode()][POS_OF_OLD_DATA] = session.prepare(
// String.format(dataByDateRangeExclusiveWithLimitASCBase, metricTypeToColumnName(metricType), "data")
// );
// }
//
// // MetricDefinition statements
// for(int i = 0; i < NUMBER_OF_TEMP_TABLES; i++) {
// String tempTableName = String.format(TEMP_TABLE_NAME_FORMAT, i);
// metricsInDatas[i] = session.prepare(String.format(findMetricInDataBase, tempTableName));
// allMetricsInDatas[i] = session.prepare(String.format(findAllMetricsInDataBases, tempTableName));
// scanTempTableWithTokens[i] = session.prepare(String.format(scanTableBase, tempTableName));
// }
// metricsInDatas[POS_OF_OLD_DATA] = session.prepare(String.format(findMetricInDataBase, "data"));
// allMetricsInDatas[POS_OF_OLD_DATA] = session.prepare(String.format(findAllMetricsInDataBases, "data"));
}

private String metricTypeToColumnName(MetricType<?> type) {
Expand Down Expand Up @@ -980,9 +906,6 @@ public <T> Observable<Row> findMetricsInMetricsIndex(String tenantId, MetricType
*/
@Override
public Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize, int maxConcurrency) {
// TODO This is making multiple requests because of the getTokenRanges() .. I should recreate fewer amount of
// queries

return Observable.from(getTokenRanges())
.map(tr -> rxSession.executeAndFetch(
getTempStatement(MetricType.UNDEFINED, TempStatement.SCAN_WITH_TOKEN_RANGES, timestamp)
Expand All @@ -1000,44 +923,13 @@ private Set<TokenRange> getTokenRanges() {
return tokenRanges;
}

@Override public Completable dropTempTable(long timestamp) {
@Override
public Observable<ResultSet> dropTempTable(long timestamp) {
String fullTableName = getTempTableName(timestamp);
String dropCQL = String.format("DROP TABLE %s", fullTableName);
return Completable.fromObservable(rxSession.execute(dropCQL));
return rxSession.execute(dropCQL);
}

/*
https://issues.apache.org/jira/browse/CASSANDRA-11143
https://issues.apache.org/jira/browse/CASSANDRA-10699
https://issues.apache.org/jira/browse/CASSANDRA-9424
*/
// @Override
// public Completable resetTempTable(long timestamp) {
// String fullTableName = String.format(TEMP_TABLE_NAME_FORMAT, getBucketIndex(timestamp));
//
// return Completable.fromAction(() -> {
// String reCreateCQL = metadata.getKeyspace(session.getLoggedKeyspace())
// .getTable(fullTableName)
// .asCQLQuery();
//
// String dropCQL = String.format("DROP TABLE %s", fullTableName);
//
// session.execute(dropCQL);
// while(!session.getCluster().getMetadata().checkSchemaAgreement()) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
// session.execute(reCreateCQL);
// });
// // TODO Needs to reprepare all the preparedstatements after dropping..
//
//// return Completable.fromObservable(rxSession.execute(dropCQL))
//// .andThen(Completable.fromObservable(rxSession.execute(reCreateCQL)));
// }

private Observable<PreparedStatement> getPrepForAllTempTables(TempStatement ts) {
return Observable.from(prepMap.entrySet())
.map(Map.Entry::getValue)
Expand Down Expand Up @@ -1322,34 +1214,6 @@ public Observable<Row> findCompressedData(MetricId<?> id, long startTime, long e
}
}

// private Integer[] tempBuckets(long startTime, long endTime, Order order) {
// ZonedDateTime endZone = ZonedDateTime.ofInstant(Instant.ofEpochMilli(endTime), UTC)
// .with(DateTimeService.startOfPreviousEvenHour());
//
// ZonedDateTime startZone = ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTime), UTC)
// .with(DateTimeService.startOfPreviousEvenHour());
//
// // Max time back is <24 hours.
// if(startZone.isBefore(endZone.minus(23, ChronoUnit.HOURS))) {
// startZone = endZone.minus(23, ChronoUnit.HOURS);
// }
//
// ConcurrentSkipListSet<Integer> buckets = new ConcurrentSkipListSet<>();
//
// while(startZone.isBefore(endZone)) {
// buckets.add(getBucketIndex(startZone.toInstant().toEpochMilli()));
// startZone = startZone.plus(1, ChronoUnit.HOURS);
// }
//
// buckets.add(getBucketIndex(endZone.toInstant().toEpochMilli()));
//
// if(order == Order.DESC) {
// return buckets.descendingSet().stream().toArray(Integer[]::new);
// } else {
// return buckets.stream().toArray(Integer[]::new);
// }
// }

private SortedMap<Long, Map<Integer, PreparedStatement>> subSetMap(long startTime, long endTime, Order order) {
Long startKey = prepMap.floorKey(startTime);
Long endKey = prepMap.floorKey(endTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import rx.functions.Func1;
import rx.functions.Func6;
import rx.observable.ListenableFutureObservable;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

/**
Expand Down Expand Up @@ -732,24 +733,34 @@ public Completable verifyAndCreateTempTables(ZonedDateTime startTime, ZonedDateT
public Completable compressBlock(long startTimeSlice, int pageSize, int maxConcurrency) {
return Completable.fromObservable(
dataAccess.findAllDataFromBucket(startTimeSlice, pageSize, maxConcurrency)
.flatMap(rows -> rows
.publish(p -> p.window(
p.map(Row::getPartitionKeyToken)
.distinctUntilChanged()))
.concatMap(o -> {
Observable<Row> sharedRows = o.share();
Observable<CompressedPointContainer> compressed = sharedRows.compose(new TempTableCompressTransformer(startTimeSlice));
Observable<Row> keyTake = sharedRows.take(1);

return compressed.zipWith(keyTake, (cpc, r) -> {
MetricId<?> metricId =
new MetricId(r.getString(0), MetricType.fromCode(r.getByte(1)), r.getString(2));
return dataAccess.insertCompressedData(metricId, startTimeSlice, cpc, getTTL(metricId))
.mergeWith(updateMetricExpiration(metricId).map(rs -> null));
});
}), maxConcurrency)
).doOnCompleted(() -> log.infof("Compress part completed"))
.andThen(dataAccess.dropTempTable(startTimeSlice));
.flatMap(rows -> rows
.publish(p -> p.window(
p.map(Row::getPartitionKeyToken)
.distinctUntilChanged()))
.concatMap(o -> {
Observable<Row> sharedRows = o.share();
Observable<CompressedPointContainer> compressed =
sharedRows.compose(new TempTableCompressTransformer(startTimeSlice));
Observable<Row> keyTake = sharedRows.take(1);

return compressed.zipWith(keyTake, (cpc, r) -> {
MetricId<?> metricId =
new MetricId(r.getString(0), MetricType.fromCode(r.getByte(1)),
r.getString(2));
return dataAccess
.insertCompressedData(metricId, startTimeSlice, cpc, getTTL(metricId))
.mergeWith(updateMetricExpiration(metricId).map(rs -> null));
});
}), maxConcurrency)
.flatMap(rs -> rs)
.doOnError(Throwable::printStackTrace)
.doOnCompleted(() -> {
log.infof("Compress part completed");
dataAccess.dropTempTable(startTimeSlice)
.subscribeOn(Schedulers.io())
.subscribe();
})
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void initClass() {
metricsService.setDataAccess(dataAccess);
metricsService.setConfigurationService(configurationService);
metricsService.startUp(session, getKeyspace(), true, metricRegistry);
dataAccess.shutdown();
}

@BeforeMethod
Expand All @@ -114,7 +115,10 @@ public void initTest(Method method) {

// To recreate the temporary tables
dataAccess = TestDataAccessFactory.newInstance(session);
// metricsService = new MetricsServiceImpl();
metricsService.setDataAccess(dataAccess);
// metricsService.setConfigurationService(configurationService);
// metricsService.startUp(session, getKeyspace(), false, metricRegistry);

long nextStart = LocalDateTime.ofInstant(Instant.ofEpochMilli(jobScheduler.now()), ZoneOffset.UTC)
.with(DateTimeService.startOfNextOddHour())
Expand All @@ -126,7 +130,10 @@ public void initTest(Method method) {
jobsService.setScheduler(jobScheduler);
jobsService.setMetricsService(metricsService);
jobsService.setConfigurationService(configurationService);
compressionJob = jobsService.start().stream().filter(details -> details.getJobName().equals(JOB_NAME))
compressionJob = jobsService
.start()
.stream()
.filter(details -> details.getJobName().equals(JOB_NAME))
.findFirst().get();

jobScheduler.advanceTimeTo(nextStart);
Expand All @@ -136,6 +143,7 @@ public void initTest(Method method) {
@AfterMethod(alwaysRun = true)
public void tearDown() {
jobsService.shutdown();
// metricsService.shutdown();
dataAccess.shutdown();
}

Expand Down Expand Up @@ -339,7 +347,6 @@ public void testCompressRetentionIndex() throws Exception {
new DataPoint<>(end.getMillis(), 4.4)));
testCompressResults(GAUGE, m1, start);


assertNotNull(dataAccess.findMetricExpiration(m1.getMetricId()).toBlocking().firstOrDefault(null));
assertNull(dataAccess.findMetricExpiration(m2.getMetricId()).toBlocking().firstOrDefault(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,22 @@
import static org.joda.time.DateTime.now;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.AssertJUnit.assertTrue;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.hawkular.metrics.core.service.transformers.MetricFromFullDataRowTransformer;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.model.AvailabilityType;
import org.hawkular.metrics.model.DataPoint;
import org.hawkular.metrics.model.Metric;
import org.hawkular.metrics.model.MetricId;
import org.hawkular.metrics.model.MetricType;
import org.hawkular.metrics.model.Tenant;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
Expand All @@ -47,7 +53,11 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import rx.Emitter;
import rx.Observable;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

/**
* @author John Sanda
Expand Down Expand Up @@ -218,6 +228,47 @@ public void findAllMetricsPartitionKeys() throws Exception {
assertEquals(metrics.size(), 4);
}

@Test
void testFindAllDataFromBucket() throws Exception {
String tenantId = "t1";
long start = now().getMillis();

int amountOfMetrics = 1000;
int datapointsPerMetric = 10;

for (int j = 0; j < datapointsPerMetric; j++) {
final int dpAdd = j;
Observable<Metric<Double>> metrics = Observable.create(emitter -> {
for (int i = 0; i < amountOfMetrics; i++) {
String metricName = String.format("m%d", i);
MetricId<Double> mId = new MetricId<>(tenantId, GAUGE, metricName);
emitter.onNext(new Metric<>(mId, asList(new DataPoint<>(start + dpAdd, 1.1))));
}
emitter.onCompleted();
}, Emitter.BackpressureMode.BUFFER);

TestSubscriber<Integer> subscriber = new TestSubscriber<>();
Observable<Integer> observable = dataAccess.insertData(metrics);
observable.subscribe(subscriber);
subscriber.awaitTerminalEvent(20, TimeUnit.SECONDS); // For Travis..
for (Throwable throwable : subscriber.getOnErrorEvents()) {
throwable.printStackTrace();
}
subscriber.assertNoErrors();
subscriber.assertCompleted();
}

Observable<Row> rowObservable = dataAccess.findAllDataFromBucket(start, DEFAULT_PAGE_SIZE, 2)
.flatMap(r -> r);

TestSubscriber<Row> tsr = new TestSubscriber<>();
rowObservable.subscribe(tsr);
tsr.awaitTerminalEvent(100, TimeUnit.SECONDS); // Travis again
tsr.assertCompleted();
tsr.assertNoErrors();
tsr.assertValueCount(amountOfMetrics * datapointsPerMetric);
}

// @Test
// public void testBucketIndexes() throws Exception {
// ZonedDateTime of = ZonedDateTime.of(2017, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
Expand Down

0 comments on commit a194b37

Please sign in to comment.