Skip to content

Commit

Permalink
Merge pull request #818 from burmanm/hwkmetrics_614
Browse files Browse the repository at this point in the history
[HWKMETRICS-614] Temporary tables for writes
  • Loading branch information
John Sanda committed Jul 7, 2017
2 parents ea3f75f + 05712b0 commit 8f89484
Show file tree
Hide file tree
Showing 32 changed files with 2,075 additions and 956 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ private void startMetricsService() {

new CassandraDriverMetrics(session, metricRegistry).registerAll();

initJobsService();

if (Boolean.valueOf(metricsReportingEnabled)) {
DropWizardReporter reporter = new DropWizardReporter(metricRegistry, metricNameService, metricsService);
int interval = Integer.getInteger(collectionIntervalConfig, 180);
Expand All @@ -460,8 +462,6 @@ private void startMetricsService() {

metricsServiceReady.fire(new ServiceReadyEvent(metricsService.insertedDataEvents()));

initJobsService();

initGCGraceSecondsManager();

if (Boolean.parseBoolean(jmxReportingEnabled)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -111,4 +111,14 @@ public static TemporalAdjuster startOfNextOddHour() {
};
}

public static TemporalAdjuster startOfPreviousEvenHour() {
return temporal -> {
int currentHour = temporal.get(ChronoField.HOUR_OF_DAY);
return temporal.minus((currentHour % 2 == 0) ? 0 : 1, ChronoUnit.HOURS)
.with(ChronoField.MINUTE_OF_HOUR, 0)
.with(ChronoField.SECOND_OF_MINUTE, 0)
.with(ChronoField.NANO_OF_SECOND, 0);
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public Completable call(JobDetails jobDetails) {
PublishSubject<Metric<?>> subject = PublishSubject.create();
subject.subscribe(metric -> {
try {
this.metricsService.updateMetricExpiration(metric);
this.metricsService.updateMetricExpiration(metric.getMetricId());
} catch (Exception e) {
logger.error("Could not update the metric expiration index for metric " + metric.getId()
+ " of tenant " + metric.getTenantId());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -17,14 +17,12 @@
package org.hawkular.metrics.core.jobs;

import org.hawkular.metrics.core.service.MetricsService;
import org.hawkular.metrics.model.Metric;
import org.hawkular.metrics.model.MetricType;
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.jboss.logging.Logger;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;

import rx.Completable;
import rx.Observable;
Expand All @@ -42,15 +40,9 @@ public class DeleteTenant implements Func1<JobDetails, Completable> {
private RxSession session;

private PreparedStatement deleteTenant;

private PreparedStatement deleteData;

private PreparedStatement deleteFromMetricsIndex;

private PreparedStatement findTags;

private PreparedStatement deleteTag;

private PreparedStatement deleteRetentions;

private MetricsService metricsService;
Expand All @@ -59,8 +51,6 @@ public DeleteTenant(RxSession session, MetricsService metricsService) {
this.session = session;
this.metricsService = metricsService;
deleteTenant = session.getSession().prepare("DELETE FROM tenants WHERE id = ?");
deleteData = session.getSession().prepare(
"DELETE FROM data WHERE tenant_id = ? AND type = ? AND metric = ? AND dpart = 0");
deleteFromMetricsIndex = session.getSession().prepare(
"DELETE FROM metrics_idx WHERE tenant_id = ? AND type = ?");
findTags = session.getSession().prepare("SELECT DISTINCT tenant_id, tname FROM metrics_tags_idx");
Expand All @@ -72,51 +62,47 @@ public DeleteTenant(RxSession session, MetricsService metricsService) {
public Completable call(JobDetails details) {
String tenantId = details.getParameters().get("tenantId");


// The concat operator is used instead of merge to ensure things execute in order. The deleteMetricData
// method queries the metrics index, so we want to update the index only after we have finished deleting
// data.
return Completable.concat(
deleteMetricData(tenantId).toCompletable()
.doOnCompleted(() -> logger.debug("Finished deleting metrics for " + tenantId)),
deleteRetentions(tenantId).toCompletable()
.doOnCompleted(() -> logger.debug("Finished deleting retentions for " + tenantId)),
deleteMetricsIndex(tenantId).toCompletable()
.doOnCompleted(() -> logger.debug("Finished updating metrics index")),
deleteTags(tenantId).toCompletable()
.doOnCompleted(() -> logger.debug("Finished deleting metric tags")),
deleteTenant(tenantId).toCompletable()
.doOnCompleted(() -> logger.debug("Finished updating tenants table for " + tenantId))
).doOnCompleted(() -> logger.debug("Finished deleting " + tenantId));
return Completable.fromObservable(
deleteMetricData(tenantId)
.concatWith(deleteTenant(tenantId))
.concatWith(deleteRetentions(tenantId))
.concatWith(deleteMetricsIndex(tenantId))
.concatWith(deleteTags(tenantId))
)
.doOnCompleted(() -> logger.infof("Finished deleting " + tenantId));
}

private Observable<ResultSet> deleteMetricData(String tenantId) {
private Observable<Void> deleteMetricData(String tenantId) {
return Observable.from(MetricType.all())
.flatMap(type -> metricsService.findMetrics(tenantId, type).flatMap(this::deleteMetricData));
}

private <T> Observable<ResultSet> deleteMetricData(Metric<T> metric) {
return session.execute(deleteData.bind(metric.getMetricId().getTenantId(),
metric.getMetricId().getType().getCode(), metric.getMetricId().getName()));
.flatMap(type -> metricsService.findMetrics(tenantId, type))
.flatMap(metric -> metricsService.deleteMetric(metric.getMetricId()));
}

private Observable<ResultSet> deleteMetricsIndex(String tenantId) {
private Observable<Void> deleteMetricsIndex(String tenantId) {
return Observable.from(MetricType.all())
.flatMap(type -> session.execute(deleteFromMetricsIndex.bind(tenantId, type.getCode())));
.flatMap(type -> session.execute(deleteFromMetricsIndex.bind(tenantId, type.getCode())))
.map(r -> null);
}

private Observable<ResultSet> deleteTags(String tenantId) {
private Observable<Void> deleteTags(String tenantId) {
return session.execute(findTags.bind())
.flatMap(Observable::from)
.filter(row -> row.getString(0).equals(tenantId))
.flatMap(row -> session.execute(deleteTag.bind(row.getString(0), row.getString(1))));
.flatMap(row -> session.execute(deleteTag.bind(row.getString(0), row.getString(1))))
.map(r -> null);
}

private <T> Observable<ResultSet> deleteRetentions(String tenantId) {
private Observable<Void> deleteRetentions(String tenantId) {
return Observable.from(MetricType.all())
.flatMap(type -> session.execute(deleteRetentions.bind(tenantId, type.getCode())));
.flatMap(type -> session.execute(deleteRetentions.bind(tenantId, type.getCode())))
.map(r -> null);
}

private Observable<ResultSet> deleteTenant(String tenantId) {
return session.execute(deleteTenant.bind(tenantId));
private Observable<Void> deleteTenant(String tenantId) {
return session.execute(deleteTenant.bind(tenantId)).map(r -> null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -47,6 +48,8 @@
*/
public class JobsServiceImpl implements JobsService, JobsServiceImplMBean {

public static final String CONFIG_PREFIX = "org.hawkular.metrics.jobs.";

private static Logger logger = Logger.getLogger(JobsServiceImpl.class);

private Scheduler scheduler;
Expand Down Expand Up @@ -111,8 +114,15 @@ public List<JobDetails> start() {
};
scheduler.register(DeleteTenant.JOB_NAME, deleteTenant, deleteTenantRetryPolicy);

CompressData compressDataJob = new CompressData(metricsService, configurationService);
scheduler.register(CompressData.JOB_NAME, compressDataJob);
TempTableCreator tempCreator = new TempTableCreator(metricsService, configurationService);
scheduler.register(TempTableCreator.JOB_NAME, tempCreator);
maybeScheduleTableCreator(backgroundJobs);

TempDataCompressor tempJob = new TempDataCompressor(metricsService, configurationService);
scheduler.register(TempDataCompressor.JOB_NAME, tempJob);

// CompressData compressDataJob = new CompressData(metricsService, configurationService);
// scheduler.register(CompressData.JOB_NAME, compressDataJob);
maybeScheduleCompressData(backgroundJobs);

deleteExpiredMetrics = new DeleteExpiredMetrics(metricsService, session, configurationService,
Expand Down Expand Up @@ -143,18 +153,45 @@ public Single<? extends JobDetails> submitDeleteExpiredMetricsJob(long expiratio
new SingleExecutionTrigger.Builder().withDelay(1, TimeUnit.MINUTES).build());
}

private void maybeScheduleTableCreator(List<JobDetails> backgroundJobs) {
String configId = TempTableCreator.CONFIG_ID;
Configuration config = configurationService.load(configId).toBlocking()
.firstOrDefault(new Configuration(configId, new HashMap<>()));
if (config.get("jobId") == null) {
long nextTrigger = LocalDateTime.now(ZoneOffset.UTC)
.truncatedTo(ChronoUnit.MINUTES).plusMinutes(2)
.toInstant(ZoneOffset.UTC).toEpochMilli();

JobDetails jobDetails = scheduler.scheduleJob(TempTableCreator.JOB_NAME, TempTableCreator.JOB_NAME,
ImmutableMap.of(), new RepeatingTrigger.Builder().withTriggerTime(nextTrigger)
.withInterval(2, TimeUnit.HOURS).build()).toBlocking().value();
backgroundJobs.add(jobDetails);
configurationService.save(configId, "jobId", jobDetails.getJobId().toString()).toBlocking();
logger.info("Scheduled temporary table creator " + jobDetails);
}
}

private void maybeScheduleCompressData(List<JobDetails> backgroundJobs) {
String configId = "org.hawkular.metrics.jobs." + CompressData.JOB_NAME;
String configId = TempDataCompressor.CONFIG_ID;
Configuration config = configurationService.load(configId).toBlocking()
.firstOrDefault(new Configuration(configId, new HashMap<>()));
if (config.get("jobId") == null) {
logger.info("Preparing to create and schedule " + CompressData.JOB_NAME + " job");
logger.info("Preparing to create and schedule " + TempDataCompressor.JOB_NAME + " job");

// Get next start of odd hour
long nextStart = LocalDateTime.now(ZoneOffset.UTC)
.with(DateTimeService.startOfNextOddHour())
.toInstant(ZoneOffset.UTC).toEpochMilli();
JobDetails jobDetails = scheduler.scheduleJob(CompressData.JOB_NAME, CompressData.JOB_NAME,

// CompressData
// JobDetails jobDetails = scheduler.scheduleJob(CompressData.JOB_NAME, CompressData.JOB_NAME,
// ImmutableMap.of(), new RepeatingTrigger.Builder().withTriggerTime(nextStart)
// .withInterval(2, TimeUnit.HOURS).build()).toBlocking().value();
// backgroundJobs.add(jobDetails);
// configurationService.save(configId, "jobId", jobDetails.getJobId().toString()).toBlocking();

// Temp table processing
JobDetails jobDetails = scheduler.scheduleJob(TempDataCompressor.JOB_NAME, TempDataCompressor.JOB_NAME,
ImmutableMap.of(), new RepeatingTrigger.Builder().withTriggerTime(nextStart)
.withInterval(2, TimeUnit.HOURS).build()).toBlocking().value();
backgroundJobs.add(jobDetails);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.jobs;

import java.util.concurrent.TimeUnit;

import org.hawkular.metrics.core.service.MetricsService;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.metrics.scheduler.api.Trigger;
import org.hawkular.metrics.sysconfig.Configuration;
import org.hawkular.metrics.sysconfig.ConfigurationService;
import org.jboss.logging.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;

import com.google.common.base.Stopwatch;

import rx.Completable;
import rx.functions.Func1;

/**
* @author Michael Burman
*/
public class TempDataCompressor implements Func1<JobDetails, Completable> {

private static Logger logger = Logger.getLogger(TempDataCompressor.class);

public static final String JOB_NAME = "TEMP_DATA_COMPRESSOR";
public static final String CONFIG_ID = JobsServiceImpl.CONFIG_PREFIX + JOB_NAME;
public static final String CONFIG_PAGE_SIZE = "page-size";
public static final String CONFIG_MAX_READ_CONCURRENCY = "concurrency.read.max";

private static final int DEFAULT_PAGE_SIZE = 1000;
private static final int DEFAULT_READ_CONCURRENCY = 2;

private MetricsService metricsService;

private int pageSize;
private boolean enabled;
private int maxReadConcurrency = DEFAULT_READ_CONCURRENCY;

public TempDataCompressor(MetricsService service, ConfigurationService configurationService) {
metricsService = service;
Configuration configuration = configurationService.load(CONFIG_ID).toSingle().toBlocking().value();
if (configuration.get(CONFIG_PAGE_SIZE) == null) {
pageSize = DEFAULT_PAGE_SIZE;
} else {
pageSize = Integer.parseInt(configuration.get(CONFIG_PAGE_SIZE));
}

if(configuration.get(CONFIG_MAX_READ_CONCURRENCY) != null) {
maxReadConcurrency = Integer.parseInt(configuration.get(CONFIG_MAX_READ_CONCURRENCY));
}

String enabledConfig = configuration.get("enabled", "true");
enabled = Boolean.parseBoolean(enabledConfig);
logger.debugf("Job enabled? %b", enabled);
}

@Override
public Completable call(JobDetails jobDetails) {
Duration runtimeBlockSize = Duration.standardHours(2);

Trigger trigger = jobDetails.getTrigger();
DateTime timeSliceInclusive = new DateTime(trigger.getTriggerTime(), DateTimeZone.UTC).minus(runtimeBlockSize);

// Rewind to previous timeslice
DateTime timeSliceStart = DateTimeService.getTimeSlice(timeSliceInclusive, runtimeBlockSize);
long startOfSlice = timeSliceStart.getMillis();

Stopwatch stopwatch = Stopwatch.createStarted();
logger.infof("Starting to process temp table for starting time of %s", timeSliceStart.toString());

// TODO Optimization - new worker per token - use parallelism in Cassandra (with configured parallelism)
return metricsService.compressBlock(startOfSlice, pageSize, maxReadConcurrency)
.doOnCompleted(() -> {
stopwatch.stop();
logger.info("Finished processing data in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) +
" ms");
});
}
}

0 comments on commit 8f89484

Please sign in to comment.