Skip to content

Commit

Permalink
[HWKMETRICS-613] If the compression job is disabled, then check to se…
Browse files Browse the repository at this point in the history
…e if there is still unexpired data for a metric before purging it.
  • Loading branch information
Stefan Negrea committed Mar 30, 2017
1 parent da1f884 commit 6c35b62
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.hawkular.metrics.model.MetricId;
import org.hawkular.metrics.model.MetricType;
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.metrics.sysconfig.ConfigurationService;
import org.hawkular.rx.cassandra.driver.RxSession;

import com.datastax.driver.core.PreparedStatement;
Expand All @@ -37,20 +38,27 @@ public class DeleteExpiredMetrics implements Func1<JobDetails, Completable> {

private MetricsService metricsService;
private RxSession session;
private ConfigurationService configurationService;
private PreparedStatement findEligibleTenants;
private PreparedStatement findEligibleMetrics;
private PreparedStatement findUnexpiredDataPoints;

private long metricExpirationDelay;

public DeleteExpiredMetrics(MetricsService metricsService, RxSession session, int metricExpirationDelayInDays) {
public DeleteExpiredMetrics(MetricsService metricsService, RxSession session,
ConfigurationService configurationService, int metricExpirationDelayInDays) {
this.metricsService = metricsService;
this.session = session;
this.configurationService = configurationService;

findEligibleTenants = session.getSession()
.prepare("SELECT DISTINCT tenant_id, type FROM metrics_expiration_idx");
findEligibleMetrics = session.getSession()
.prepare(
"SELECT tenant_id, type, metric, time FROM metrics_expiration_idx WHERE tenant_id = ? AND type = ?");
findUnexpiredDataPoints = session.getSession()
.prepare(
"SELECT * FROM data WHERE tenant_id = ? AND type = ? AND metric = ? AND dpart = 0 LIMIT 1;");

this.metricExpirationDelay = metricExpirationDelayInDays * 24 * 3600 * 1000L;
}
Expand All @@ -70,12 +78,37 @@ public Completable call(JobDetails jobDetails) {
long expirationTime = (configuredExpirationTime != null ? configuredExpirationTime
: DateTimeService.now.get().getMillis()) - metricExpirationDelay;

return session.execute(findEligibleTenants.bind())
Observable<MetricId<?>> expirationIndexResults = session.execute(findEligibleTenants.bind())
.flatMap(Observable::from)
.flatMap(row -> session.execute(findEligibleMetrics.bind(row.getString(0), row.getByte(1))))
.flatMap(Observable::from)
.filter(row -> row.getTimestamp(3).getTime() < expirationTime)
.map(row -> new MetricId<>(row.getString(0), MetricType.fromCode(row.getByte(1)), row.getString(2)))
.flatMap(metricId -> metricsService.deleteMetric(metricId)).toCompletable();
.map(row -> new MetricId<>(row.getString(0), MetricType.fromCode(row.getByte(1)), row.getString(2)));

//If the compression job is disabled then check the data point table for data
String compressJobEnabledConfig = configurationService.load(CompressData.CONFIG_ID, "enabled").toBlocking()
.firstOrDefault(null);

boolean compressJobEnabled = false;
if (compressJobEnabledConfig != null && !compressJobEnabledConfig.isEmpty()) {
try {
compressJobEnabled = Boolean.parseBoolean(compressJobEnabledConfig);
} catch (Exception e) {
//do nothing, assume the compression job is disabled
}
}
if (!compressJobEnabled) {
expirationIndexResults = expirationIndexResults
.flatMap(r -> session
.execute(findUnexpiredDataPoints.bind(r.getTenantId(), r.getType().getCode(), r.getName()))
.flatMap(Observable::from)
.isEmpty()
.filter(empty -> empty)
.map(empty -> r));
}

return expirationIndexResults
.flatMap(metricId -> metricsService.deleteMetric(metricId))
.toCompletable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public List<JobDetails> start() {
scheduler.register(CompressData.JOB_NAME, compressDataJob);
maybeScheduleCompressData(backgroundJobs);

deleteExpiredMetrics = new DeleteExpiredMetrics(metricsService, session, this.metricExpirationDelay);
deleteExpiredMetrics = new DeleteExpiredMetrics(metricsService, session, configurationService,
this.metricExpirationDelay);
scheduler.register(DeleteExpiredMetrics.JOB_NAME, deleteExpiredMetrics);
maybeScheduleMetricExpirationJob(backgroundJobs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ public void tearDown() {
}

@Test
public void testOnDemandDeleteExpiredMetricsJob() throws Exception {
public void testOnDemandDeleteExpiredMetricsJobCompressionEnabled() throws Exception {
configurationService.save(CompressData.CONFIG_ID, "enabled", Boolean.TRUE.toString()).toBlocking();

String tenantId = nextTenantId();
DateTime start = new DateTime(jobScheduler.now());

Expand Down Expand Up @@ -176,6 +178,34 @@ public void testOnDemandDeleteExpiredMetricsJob() throws Exception {
assertEquals(metrics.size(), 0);
}

@Test
public void testOnDemandDeleteExpiredMetricsJobCompressionDisabled() throws Exception {
configurationService.save(CompressData.CONFIG_ID, "enabled", Boolean.FALSE.toString()).toBlocking();

String tenantId = nextTenantId();
DateTime start = new DateTime(jobScheduler.now());

Metric<Double> g1 = new Metric<>(new MetricId<>(tenantId, GAUGE, "G1"),
ImmutableMap.of("x", "1", "y", "2"), 1);
Metric<Double> g2 = new Metric<>(new MetricId<>(tenantId, GAUGE, "G2"),
ImmutableMap.of("x", "2", "y", "3"), 2, asList(
new DataPoint<>(start.getMillis(), 3.3),
new DataPoint<>(start.plusMinutes(2).getMillis(), 4.4)));

doAction(() -> metricsService.createMetric(g1, true));
doAction(() -> metricsService.createMetric(g2, true));
doAction(() -> metricsService.addDataPoints(GAUGE, Observable.just(g2)));

List<Metric<Double>> metrics = getOnNextEvents(() -> metricsService.findMetrics(tenantId, GAUGE));
assertEquals(metrics.size(), 2);

long expiration = DateTimeService.now.get().getMillis() + 10 * 24 * 3600 * 1000L;
runOnDemandDeleteExpiredMetricsJob(expiration);
metrics = getOnNextEvents(() -> metricsService.findMetrics(tenantId, GAUGE));
assertEquals(metrics.size(), 1);
assertEquals(metrics.get(0).getId(), "G2");
}

private void runOnDemandDeleteExpiredMetricsJob(long time) throws InterruptedException {
JobDetails runJobDetails = jobsService.submitDeleteExpiredMetricsJob(time, jobName).toBlocking().value();
CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -190,6 +220,7 @@ private void runOnDemandDeleteExpiredMetricsJob(long time) throws InterruptedExc

@Test
public void testScheduleDeleteExpiredMetricsJob() throws Exception {
configurationService.save(CompressData.CONFIG_ID, "enabled", Boolean.TRUE.toString()).toBlocking();
String tenantId = nextTenantId();

Metric<Long> c1 = new Metric<>(new MetricId<>(tenantId, COUNTER, "C1"),
Expand Down

0 comments on commit 6c35b62

Please sign in to comment.