Skip to content

Commit

Permalink
Configurable maxConcurrency for compression job, fix dual preparing o…
Browse files Browse the repository at this point in the history
…f certain MetricTypes
  • Loading branch information
burmanm committed Jul 4, 2017
1 parent 3d468fd commit e0bce48
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
*/
public class JobsServiceImpl implements JobsService, JobsServiceImplMBean {

public static final String CONFIG_PREFIX = "org.hawkular.metrics.jobs.";

private static Logger logger = Logger.getLogger(JobsServiceImpl.class);

private Scheduler scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,30 @@ public class TempDataCompressor implements Func1<JobDetails, Completable> {
private static Logger logger = Logger.getLogger(TempDataCompressor.class);

public static final String JOB_NAME = "TEMP_DATA_COMPRESSOR";
public static final String CONFIG_ID = "org.hawkular.metrics.jobs." + JOB_NAME;
public static final String CONFIG_ID = JobsServiceImpl.CONFIG_PREFIX + "temp.table.compressor";
public static final String CONFIG_PAGE_SIZE = "page-size";
public static final String CONFIG_MAX_READ_CONCURRENCY = "concurrency.read.max";

private static final int DEFAULT_PAGE_SIZE = 1000;
private static final int DEFAULT_READ_CONCURRENCY = 2;

private MetricsService metricsService;

private int pageSize;
private boolean enabled;
private int maxReadConcurrency = DEFAULT_READ_CONCURRENCY;

public TempDataCompressor(MetricsService service, ConfigurationService configurationService) {
metricsService = service;
Configuration configuration = configurationService.load(CONFIG_ID).toSingle().toBlocking().value();
if (configuration.get("page-size") == null) {
if (configuration.get(CONFIG_PAGE_SIZE) == null) {
pageSize = DEFAULT_PAGE_SIZE;
} else {
pageSize = Integer.parseInt(configuration.get("page-size"));
pageSize = Integer.parseInt(configuration.get(CONFIG_PAGE_SIZE));
}

if(configuration.get(CONFIG_MAX_READ_CONCURRENCY) != null) {
maxReadConcurrency = Integer.parseInt(configuration.get(CONFIG_MAX_READ_CONCURRENCY));
}

String enabledConfig = configuration.get("enabled", "true");
Expand All @@ -80,7 +88,7 @@ public Completable call(JobDetails jobDetails) {
logger.infof("Starting to process temp table for starting time of %d", startOfSlice);

// TODO Optimization - new worker per token - use parallelism in Cassandra (with configured parallelism)
return metricsService.compressBlock(startOfSlice, pageSize)
return metricsService.compressBlock(startOfSlice, pageSize, maxReadConcurrency)
.doOnCompleted(() -> {
stopwatch.stop();
logger.info("Finished processing data in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,6 @@ public Completable call(JobDetails jobDetails) {
ZonedDateTime lastMaintainedBlock = currentBlock.plus(forwardTime);

return service.verifyAndCreateTempTables(currentBlock, lastMaintainedBlock)
.doOnCompleted(() -> logger.infof("Temporary tables are valid until %s", forwardTime.toString()));
.doOnCompleted(() -> logger.infof("Temporary tables are valid until %s", lastMaintainedBlock.toString()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ public interface DataAccess {

<T> Observable<Row> findMetricsInMetricsIndex(String tenantId, MetricType<T> type);

Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize);

/*
https://issues.apache.org/jira/browse/CASSANDRA-11143
https://issues.apache.org/jira/browse/CASSANDRA-10699
https://issues.apache.org/jira/browse/CASSANDRA-9424
*/
// Completable resetTempTable(long timestamp);

Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize, int maxConcurrency);

Completable dropTempTable(long timestamp);

Observable<Row> findAllMetricsInData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ void prepareTempStatements(String tableName) {
prepMap.put(mapKey, statementMap);

// Per metricType
for (MetricType<?> metricType : MetricType.all()) {
for (MetricType<?> metricType : MetricType.userTypes()) {
if(metricType == STRING) { continue; } // We don't support String metrics in temp tables yet
for (TempStatement st : TempStatement.values()) {
Integer key = getMapKey(metricType, st);
Expand Down Expand Up @@ -451,7 +451,7 @@ public Observable<ResultSet> createTempTablesIfNotExists(final Set<Long> timesta
})

.flatMapIterable(s -> s)
.zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (st, l) -> st)
.zipWith(Observable.interval(300, TimeUnit.MILLISECONDS), (st, l) -> st)
.concatMap(this::createTemporaryTable);
}

Expand Down Expand Up @@ -974,12 +974,12 @@ public <T> Observable<Row> findMetricsInMetricsIndex(String tenantId, MetricType
* Performance can be improved by using data locality and fetching with multiple threads.
*
* @param timestamp A timestamp inside the wanted bucket (such as the previous starting row timestamp)
* @param pageSize How many rows to fetch each time
* @param maxConcurrency To how many streams should token ranges be split to
* @return Observable of Observables per partition key
*/
@Override
public Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize) {
// int bucket = getBucketIndex(timestamp);

public Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize, int maxConcurrency) {
// TODO This is making multiple requests because of the getTokenRanges() .. I should recreate fewer amount of
// queries

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,14 @@ <T> Observable<DataPoint<T>> findDataPoints(MetricId<T> id, long start, long end

Completable verifyAndCreateTempTables(ZonedDateTime startTime, ZonedDateTime endTime);

@SuppressWarnings("unchecked") Completable compressBlock(long startTimeSlice, int pageSize);
/**
*
* @param startTimeSlice
* @param pageSize
* @param maxConcurrency How many reads are concurrently called from Cassandra
* @return
*/
@SuppressWarnings("unchecked") Completable compressBlock(long startTimeSlice, int pageSize, int maxConcurrency);

/**
* Compresses the given range between timestamps to a single block.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,9 +729,9 @@ public Completable verifyAndCreateTempTables(ZonedDateTime startTime, ZonedDateT

@Override
@SuppressWarnings("unchecked")
public Completable compressBlock(long startTimeSlice, int pageSize) {
public Completable compressBlock(long startTimeSlice, int pageSize, int maxConcurrency) {
return Completable.fromObservable(
dataAccess.findAllDataFromBucket(startTimeSlice, pageSize)
dataAccess.findAllDataFromBucket(startTimeSlice, pageSize, maxConcurrency)
.flatMap(rows -> rows
.publish(p -> p.window(
p.map(Row::getPartitionKeyToken)
Expand All @@ -747,7 +747,7 @@ public Completable compressBlock(long startTimeSlice, int pageSize) {
return dataAccess.insertCompressedData(metricId, startTimeSlice, cpc, getTTL(metricId))
.mergeWith(updateMetricExpiration(metricId).map(rs -> null));
});
}))
}), maxConcurrency)
).doOnCompleted(() -> log.infof("Compress part completed"))
.andThen(dataAccess.dropTempTable(startTimeSlice));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ public <T> Observable<Row> findMetricsInMetricsIndex(String tenantId, MetricType
return delegate.findMetricsInMetricsIndex(tenantId, type);
}

@Override public Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize) {
return delegate.findAllDataFromBucket(timestamp, pageSize);
@Override public Observable<Observable<Row>> findAllDataFromBucket(long timestamp, int pageSize, int maxConcurrency) {
return delegate.findAllDataFromBucket(timestamp, pageSize, maxConcurrency);
}

@Override public Completable dropTempTable(long timestamp) {
Expand Down

0 comments on commit e0bce48

Please sign in to comment.