Skip to content

Commit

Permalink
[HWKMETRICS-613] Update DataAccess.updateMetricExpirationIndex to ret…
Browse files Browse the repository at this point in the history
…urn Observable. The scheduler is now started after all the recurring jobs are scheduled. And log errors when metrics cannot be deleted.
  • Loading branch information
Stefan Negrea committed Mar 31, 2017
1 parent 6c35b62 commit 6f34f0f
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.metrics.sysconfig.ConfigurationService;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.jboss.logging.Logger;

import com.datastax.driver.core.PreparedStatement;

Expand All @@ -34,6 +35,9 @@
* @author Stefan Negrea
*/
public class DeleteExpiredMetrics implements Func1<JobDetails, Completable> {

private static Logger logger = Logger.getLogger(DeleteExpiredMetrics.class);

public static final String JOB_NAME = "DELETE_EXPIRED_METRICS";

private MetricsService metricsService;
Expand All @@ -42,7 +46,6 @@ public class DeleteExpiredMetrics implements Func1<JobDetails, Completable> {
private PreparedStatement findEligibleTenants;
private PreparedStatement findEligibleMetrics;
private PreparedStatement findUnexpiredDataPoints;

private long metricExpirationDelay;

public DeleteExpiredMetrics(MetricsService metricsService, RxSession session,
Expand Down Expand Up @@ -108,7 +111,10 @@ public Completable call(JobDetails jobDetails) {
}

return expirationIndexResults
.flatMap(metricId -> metricsService.deleteMetric(metricId))
.concatMap(metricId -> metricsService.deleteMetric(metricId))
.doOnError(e -> {
logger.error("Failed to delete metric data", e);
})
.toCompletable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ public void setScheduler(Scheduler scheduler) {

@Override
public List<JobDetails> start() {
scheduler.start();

List<JobDetails> backgroundJobs = new ArrayList<>();

deleteTenant = new DeleteTenant(session, metricsService);
Expand All @@ -120,6 +118,8 @@ public List<JobDetails> start() {
scheduler.register(DeleteExpiredMetrics.JOB_NAME, deleteExpiredMetrics);
maybeScheduleMetricExpirationJob(backgroundJobs);

scheduler.start();

return backgroundJobs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ <T> Observable<ResultSet> deleteAndInsertCompressedGauge(MetricId<T> id, long ti
CompressedPointContainer cpc,
long sliceStart, long sliceEnd, int ttl);

<T> ResultSetFuture updateMetricExpirationIndex(MetricId<T> id, long expirationTime);
<T> Observable<ResultSet> updateMetricExpirationIndex(MetricId<T> id, long expirationTime);

<T> Observable<ResultSet> deleteFromMetricExpirationIndex(MetricId<T> id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,8 +1228,8 @@ public Observable<Row> findAllMetricsFromTagsIndex() {
}

@Override
public <T> ResultSetFuture updateMetricExpirationIndex(MetricId<T> id, long expirationTime) {
return session.executeAsync(updateMetricExpirationIndex.bind(id.getTenantId(),
public <T> Observable<ResultSet> updateMetricExpirationIndex(MetricId<T> id, long expirationTime) {
return rxSession.execute(updateMetricExpirationIndex.bind(id.getTenantId(),
id.getType().getCode(), id.getName(), new Date(expirationTime)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1036,8 +1036,7 @@ public <T> Observable<Void> updateMetricExpiration(Metric<T> metric) {
expiration = DateTimeService.now.get().getMillis() + this.getTTL(metric.getMetricId()) * DAY_TO_MILLIS;
}

return ListenableFutureObservable
.from(dataAccess.updateMetricExpirationIndex(metric.getMetricId(), expiration), metricsTasks)
return dataAccess.updateMetricExpirationIndex(metric.getMetricId(), expiration)
.doOnError(t -> log.error("Failure to update expiration index", t))
.flatMap(r -> null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import com.google.common.collect.ImmutableMap;

import rx.Observable;
import rx.observable.ListenableFutureObservable;
import rx.schedulers.Schedulers;

/**
* Test the job that deletes expired metrics.
Expand Down Expand Up @@ -229,9 +227,8 @@ public void testScheduleDeleteExpiredMetricsJob() throws Exception {
ImmutableMap.of("x", "2", "y", "3"), 12);

doAction(() -> metricsService.createMetric(c1, true));
ListenableFutureObservable
.from(dataAccess.updateMetricExpirationIndex(c1.getMetricId(),
DateTimeService.now.get().getMillis() - 3 * 24 * 3600 * 1000L), Schedulers.immediate());
dataAccess.updateMetricExpirationIndex(c1.getMetricId(),
DateTimeService.now.get().getMillis() - 3 * 24 * 3600 * 1000L).toBlocking();
doAction(() -> metricsService.createMetric(c2, true));

List<Metric<Long>> metrics = getOnNextEvents(() -> metricsService.findMetrics(tenantId, COUNTER));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public Observable<Row> findAllMetricsFromTagsIndex() {
}

@Override
public <T> ResultSetFuture updateMetricExpirationIndex(MetricId<T> id, long expirationTime) {
public <T> Observable<ResultSet> updateMetricExpirationIndex(MetricId<T> id, long expirationTime) {
return delegate.updateMetricExpirationIndex(id, expirationTime);
}

Expand Down

0 comments on commit 6f34f0f

Please sign in to comment.