Skip to content

Commit

Permalink
[HWKMETRICS-613] Schedule the job that deletes expired metrics to run…
Browse files Browse the repository at this point in the history
… daily at a configured interval; the default is every 7 days.
  • Loading branch information
Stefan Negrea committed Mar 29, 2017
1 parent 4136201 commit ef12a5c
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -58,4 +58,8 @@ public String get(String name, String defaultValue) {
public void set(String name, String value) {
properties.put(name, value);
}

public void delete(String name) {
properties.remove(name);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -40,6 +40,8 @@ public class ConfigurationService {

private PreparedStatement updateConfigurationValue;

private PreparedStatement deleteConfigurationValue;

// TODO make async
// I could have just as easily passed the session as a constructor arg. I am doing it in the init method because
// eventually I would like service initialization async.
Expand All @@ -56,6 +58,10 @@ public void init(RxSession session) {
updateConfigurationValue = session.getSession().prepare(
"INSERT INTO sys_config (config_id, name, value) VALUES (?, ?, ?)")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);

deleteConfigurationValue = session.getSession().prepare(
"DELETE FROM sys_config WHERE config_id =? and name = ?")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
}

public Observable<Configuration> load(String id) {
Expand Down Expand Up @@ -91,4 +97,7 @@ public Observable<Void> save(String configId, String name, String value, Schedul
return session.execute(updateConfigurationValue.bind(configId, name, value), scheduler).map(resultSet -> null);
}

public Observable<Void> delete(String configId, String name) {
return session.execute(deleteConfigurationValue.bind(configId, name)).map(resultSet -> null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public List<JobDetails> start() {

deleteExpiredMetrics = new DeleteExpiredMetrics(metricsService, session, this.metricExpirationDelay);
scheduler.register(DeleteExpiredMetrics.JOB_NAME, deleteExpiredMetrics);
maybeScheduleMetricExpirationJob(backgroundJobs);

return backgroundJobs;
}
Expand Down Expand Up @@ -160,6 +161,49 @@ private void maybeScheduleCompressData(List<JobDetails> backgroundJobs) {
}
}

private void maybeScheduleMetricExpirationJob(List<JobDetails> backgroundJobs) {
String jobIdConfigKey = "jobId";
String jobFrequencyKey = "jobFrequency";

String configId = "org.hawkular.metrics.jobs." + DeleteExpiredMetrics.JOB_NAME;
Configuration config = configurationService.load(configId).toBlocking()
.firstOrDefault(new Configuration(configId, new HashMap<>()));

if (config.get(jobIdConfigKey) != null) {
Integer configuredJobFrequency = null;
try {
configuredJobFrequency = Integer.parseInt(config.get("jobFrequency"));
} catch (Exception e) {
//do nothing, the parsing failed which makes the value unknown
}

if (configuredJobFrequency == null || configuredJobFrequency != this.metricExpirationJobFrequencyInDays) {
scheduler.unscheduleJob(config.get(jobIdConfigKey)).await();
configurationService.delete(configId, jobIdConfigKey).toBlocking();
config.delete(jobIdConfigKey);
configurationService.delete(configId, jobFrequencyKey).toBlocking();
config.delete(jobFrequencyKey);
}
}

if (config.get(jobIdConfigKey) == null) {
logger.info("Preparing to create and schedule " + DeleteExpiredMetrics.JOB_NAME + " job");

//Get start of next day
long nextStart = DateTimeService.current24HourTimeSlice().plusDays(1).getMillis();
JobDetails jobDetails = scheduler.scheduleJob(DeleteExpiredMetrics.JOB_NAME, DeleteExpiredMetrics.JOB_NAME,
ImmutableMap.of(), new RepeatingTrigger.Builder().withTriggerTime(nextStart)
.withInterval(this.metricExpirationJobFrequencyInDays, TimeUnit.DAYS).build())
.toBlocking().value();
backgroundJobs.add(jobDetails);
configurationService.save(configId, jobIdConfigKey, jobDetails.getJobId().toString()).toBlocking();
configurationService.save(configId, jobFrequencyKey, this.metricExpirationJobFrequencyInDays + "")
.toBlocking();

logger.info("Created and scheduled " + jobDetails);
}
}

// JMX management
private void submitCompressJob(Map<String, String> parameters) {
String jobName = String.format("%s_single_%s", CompressData.JOB_NAME, parameters.get(CompressData.TARGET_TIME));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -41,6 +41,14 @@ public interface Scheduler {
*/
Single<JobDetails> scheduleJob(String type, String name, Map<String, String> parameters, Trigger trigger);

/**
* Deletes all the scheduled execution for a job id.
*
* @param jobId
* @return Completable instance
*/
Completable unscheduleJob(String jobId);

/**
* Register a function that produces a job of the specified type. This method should be called prior to scheduling
* any jobs of the specified type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;

import rx.Completable;
import rx.Observable;

/**
Expand All @@ -60,6 +61,8 @@ public class JobsService {

private PreparedStatement updateStatus;

private PreparedStatement deleteScheduled;

public JobsService(RxSession session) {
this.session = session;
findTimeSlices = session.getSession().prepare("SELECT DISTINCT time_slice FROM scheduled_jobs_idx");
Expand All @@ -75,6 +78,9 @@ public JobsService(RxSession session) {
"status) VALUES (?, ?, ?, ?, ?, ?, ?)");
updateStatus = session.getSession().prepare(
"UPDATE scheduled_jobs_idx SET status = ? WHERE time_slice = ? AND job_id = ?");

deleteScheduled = session.getSession().prepare(
"DELETE FROM scheduled_jobs_idx WHERE time_slice = ? AND job_id = ?");
}

public Observable<Date> findActiveTimeSlices(Date currentTime, rx.Scheduler scheduler) {
Expand Down Expand Up @@ -104,6 +110,13 @@ public Observable<JobDetails> findJobs(Date timeSlice, rx.Scheduler scheduler) {
.flatMap(map -> Observable.from(map.values()));
}

public Completable deleteJob(UUID jobId, rx.Scheduler scheduler) {
return session.executeAndFetch(findTimeSlices.bind(), scheduler)
.map(row -> row.getTimestamp(0))
.flatMap(timeSlice -> session.execute(deleteScheduled.bind(timeSlice, jobId)))
.toCompletable();
}

/**
* This method is currently unused.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ public Single<JobDetails> scheduleJob(String type, String name, Map<String, Stri
.doOnError(t -> logger.warn("Failed to schedule job " + name, t));
}

@Override
public Completable unscheduleJob(String jobId) {
return jobsService.deleteJob(UUID.fromString(jobId), queryScheduler);
}

@Override
public void start() {
running = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public void register(String jobType, Func1<JobDetails, Completable> jobProducer,
scheduler.register(jobType, jobProducer, retryFunction);
}

@Override
public Completable unscheduleJob(String jobId) {
return scheduler.unscheduleJob(jobId);
}

@Override
public void start() {
scheduler.start();
Expand Down

0 comments on commit ef12a5c

Please sign in to comment.