Skip to content

Commit

Permalink
[HWKMETRICS-756] Previous commits did not backport all changes for ex…
Browse files Browse the repository at this point in the history
…pired metrics job
  • Loading branch information
jsanda committed Jun 25, 2018
1 parent 18cec7d commit ef74865
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,19 @@
*/
package org.hawkular.metrics.core.jobs;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.hawkular.metrics.core.service.MetricsService;
import org.hawkular.metrics.datetime.DateTimeService;
import org.hawkular.metrics.scheduler.api.JobDetails;
import org.hawkular.metrics.scheduler.api.RepeatingTrigger;
import org.hawkular.metrics.scheduler.api.Scheduler;
import org.hawkular.metrics.scheduler.api.SingleExecutionTrigger;
import org.hawkular.metrics.sysconfig.Configuration;
import org.hawkular.metrics.sysconfig.ConfigurationService;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.jboss.logging.Logger;

import com.google.common.collect.ImmutableMap;

import rx.Completable;
import rx.Single;

/**
Expand Down Expand Up @@ -82,25 +72,16 @@ public void setScheduler(Scheduler scheduler) {

@Override
public void start() {
List<JobDetails> backgroundJobs = new ArrayList<>();

// The CompressData job has been replaced by the TempDataCompressor job
unscheduleCompressData();
unscheduleDeleteExpiredMetrics();

deleteTenant = new DeleteTenant(session, metricsService);

scheduler.register(DeleteTenant.JOB_NAME, deleteTenant);

TempTableCreator tempCreator = new TempTableCreator(metricsService, configurationService);
scheduler.register(TempTableCreator.JOB_NAME, tempCreator);
maybeScheduleTableCreator(backgroundJobs);

TempDataCompressor tempJob = new TempDataCompressor(metricsService, configurationService);
scheduler.register(TempDataCompressor.JOB_NAME, tempJob);

maybeScheduleTempDataCompressor(backgroundJobs);

scheduler.start();
}

Expand All @@ -115,84 +96,6 @@ public Single<? extends JobDetails> submitDeleteTenantJob(String tenantId, Strin
new SingleExecutionTrigger.Builder().withDelay(1, TimeUnit.MINUTES).build());
}

private void maybeScheduleTableCreator(List<JobDetails> backgroundJobs) {
String configId = TempTableCreator.CONFIG_ID;
Configuration config = configurationService.load(configId).toBlocking()
.firstOrDefault(new Configuration(configId, new HashMap<>()));
if (config.get("jobId") == null) {
long nextTrigger = LocalDateTime.now(ZoneOffset.UTC)
.truncatedTo(ChronoUnit.MINUTES).plusMinutes(2)
.toInstant(ZoneOffset.UTC).toEpochMilli();

JobDetails jobDetails = scheduler.scheduleJob(TempTableCreator.JOB_NAME, TempTableCreator.JOB_NAME,
ImmutableMap.of(), new RepeatingTrigger.Builder().withTriggerTime(nextTrigger)
.withInterval(2, TimeUnit.HOURS).build()).toBlocking().value();
backgroundJobs.add(jobDetails);
configurationService.save(configId, "jobId", jobDetails.getJobId().toString()).toBlocking();
logger.info("Scheduled temporary table creator " + jobDetails);
}
}

private void unscheduleCompressData() {
Configuration config = configurationService.load(CompressData.CONFIG_ID).toBlocking()
.firstOrDefault(new Configuration(CompressData.CONFIG_ID, new HashMap<>()));
String jobId = config.get("jobId");

if (config.getProperties().isEmpty()) {
// This means we have a new installation and not an upgrade. The CompressData job has not been previously
// installed so there is no db clean up necessary.
} else {
Completable unscheduled;
if (jobId == null) {
logger.warnf("Expected to find a jobId property in database for %s. Attempting to unschedule job by " +
"name.", CompressData.JOB_NAME);
unscheduled = scheduler.unscheduleJobByTypeAndName(CompressData.JOB_NAME, CompressData.JOB_NAME);
} else {
unscheduled = scheduler.unscheduleJobById(jobId);
}
unscheduled.await();
if (!config.getProperties().isEmpty()) {
configurationService.delete(CompressData.CONFIG_ID).await();
}
}
}

private void unscheduleDeleteExpiredMetrics() {
String jobName = "DELETE_EXPIRED_METRICS";
String configId = "org.hawkular.metrics.jobs.DELETE_EXPIRED_METRICS";
// We load the configuration first so that delete is done only if it exists in order to avoid generating
// tombstones.
Completable deleteConfig = configurationService.load(configId)
.map(config -> configurationService.delete(configId))
.toCompletable();
// unscheduleJobByTypeAndName will not generate unnecessary tombstones as it does reads before writes
Completable unscheduleJob = scheduler.unscheduleJobByTypeAndName(jobName, jobName);
Completable.merge(deleteConfig, unscheduleJob).await();
}

private void maybeScheduleTempDataCompressor(List<JobDetails> backgroundJobs) {
String configId = TempDataCompressor.CONFIG_ID;
Configuration config = configurationService.load(configId).toBlocking()
.firstOrDefault(new Configuration(configId, new HashMap<>()));
if (config.get("jobId") == null) {
logger.info("Preparing to create and schedule " + TempDataCompressor.JOB_NAME + " job");

// Get next start of odd hour
long nextStart = LocalDateTime.now(ZoneOffset.UTC)
.with(DateTimeService.startOfNextOddHour())
.toInstant(ZoneOffset.UTC).toEpochMilli();

// Temp table processing
JobDetails jobDetails = scheduler.scheduleJob(TempDataCompressor.JOB_NAME, TempDataCompressor.JOB_NAME,
ImmutableMap.of(), new RepeatingTrigger.Builder().withTriggerTime(nextStart)
.withInterval(2, TimeUnit.HOURS).build()).toBlocking().value();
backgroundJobs.add(jobDetails);
configurationService.save(configId, "jobId", jobDetails.getJobId().toString()).toBlocking();

logger.info("Created and scheduled " + jobDetails);
}
}

// JMX management
private void submitCompressJob(Map<String, String> parameters) {
String jobName = String.format("%s_single_%s", CompressData.JOB_NAME, parameters.get(CompressData.TARGET_TIME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ public class JobsManager {

public static final String TEMP_DATA_COMPRESSOR_CONFIG_ID = "org.hawkular.metrics.jobs." + TEMP_DATA_COMPRESSOR_JOB;

public static final String DELETE_EXPIRED_METRICS_JOB = "DELETE_EXPIRED_METRICS";

public static final String DELETE_EXPIRED_METRICS_CONFIG_ID = "org.hawkular.metrics.jobs." +
DELETE_EXPIRED_METRICS_JOB;

private ConfigurationService configurationService;
private Scheduler scheduler;

Expand All @@ -80,15 +75,15 @@ public List<JobDetails> installJobs() {
List<JobDetails> backgroundJobs = new ArrayList<>();

unscheduleCompressData();
unscheduleDeleteExpiredMetrics();
maybeScheduleTableCreator(backgroundJobs);
maybeScheduleTempDataCompressor(backgroundJobs);
maybeScheduleMetricExpirationJob(backgroundJobs);

return backgroundJobs;
}

private void unscheduleCompressData() {
Configuration config = configurationService.load(COMPRESS_DATA_JOB).toBlocking()
Configuration config = configurationService.load(COMPRESS_DATA_CONFIG_ID).toBlocking()
.firstOrDefault(new Configuration(COMPRESS_DATA_CONFIG_ID, new HashMap<>()));
String jobId = config.get("jobId");

Expand All @@ -111,6 +106,21 @@ private void unscheduleCompressData() {
}
}

private void unscheduleDeleteExpiredMetrics() {
String jobName = "DELETE_EXPIRED_METRICS";
String configId = "org.hawkular.metrics.jobs.DELETE_EXPIRED_METRICS";

// We load the configuration first so that delete is done only if it exists in order to avoid generating
// tombstones.
Completable deleteConfig = configurationService.load(configId)
.map(config -> configurationService.delete(configId))
.toCompletable();
// unscheduleJobByTypeAndName will not generate unnecessary tombstones as it does reads before writes
Completable unscheduleJob = scheduler.unscheduleJobByTypeAndName(jobName, jobName);

Completable.merge(deleteConfig, unscheduleJob).await();
}

private void maybeScheduleTableCreator(List<JobDetails> backgroundJobs) {
Configuration config = configurationService.load(TEMP_TABLE_CREATE_CONFIG_ID).toBlocking()
.firstOrDefault(new Configuration(TEMP_TABLE_CREATE_CONFIG_ID, new HashMap<>()));
Expand Down Expand Up @@ -152,53 +162,4 @@ private void maybeScheduleTempDataCompressor(List<JobDetails> backgroundJobs) {
}
}

private void maybeScheduleMetricExpirationJob(List<JobDetails> backgroundJobs) {
int metricExpirationJobFrequencyInDays = Integer.getInteger("hawkular.metrics.jobs.expiration.frequency", 7);
boolean metricExpirationJobEnabled = Boolean.getBoolean("hawkular.metrics.jobs.expiration.enabled");

String jobIdConfigKey = "jobId";
String jobFrequencyKey = "jobFrequency";

Configuration config = configurationService.load(DELETE_EXPIRED_METRICS_CONFIG_ID).toBlocking()
.firstOrDefault(new Configuration(DELETE_EXPIRED_METRICS_CONFIG_ID, new HashMap<>()));

if (config.get(jobIdConfigKey) != null) {
Integer configuredJobFrequency = null;
try {
configuredJobFrequency = Integer.parseInt(config.get(jobFrequencyKey));
} catch (Exception e) {
//do nothing, the parsing failed which makes the value unknown
}

if (configuredJobFrequency == null || configuredJobFrequency != metricExpirationJobFrequencyInDays
|| metricExpirationJobFrequencyInDays <= 0 || configuredJobFrequency <= 0 ||
!metricExpirationJobEnabled) {
scheduler.unscheduleJobById(config.get(jobIdConfigKey)).await();
configurationService.delete(DELETE_EXPIRED_METRICS_CONFIG_ID, jobIdConfigKey)
.concatWith(configurationService.delete(DELETE_EXPIRED_METRICS_CONFIG_ID, jobFrequencyKey))
.await();
config.delete(jobIdConfigKey);
config.delete(jobFrequencyKey);
}
}

if (config.get(jobIdConfigKey) == null && metricExpirationJobFrequencyInDays > 0
&& metricExpirationJobEnabled) {
logger.info("Preparing to create and schedule " + DELETE_EXPIRED_METRICS_JOB + " job");

//Get start of next day
long nextStart = DateTimeService.current24HourTimeSlice().plusDays(1).getMillis();
JobDetails jobDetails = scheduler.scheduleJob(DELETE_EXPIRED_METRICS_JOB, DELETE_EXPIRED_METRICS_JOB,
ImmutableMap.of(), new RepeatingTrigger.Builder().withTriggerTime(nextStart)
.withInterval(metricExpirationJobFrequencyInDays, TimeUnit.DAYS).build()).toBlocking()
.value();
backgroundJobs.add(jobDetails);
configurationService.save(DELETE_EXPIRED_METRICS_CONFIG_ID, jobIdConfigKey, jobDetails.getJobId()
.toString()).toBlocking();
configurationService.save(DELETE_EXPIRED_METRICS_CONFIG_ID, jobFrequencyKey,
metricExpirationJobFrequencyInDays + "").toBlocking();

logger.info("Created and scheduled " + jobDetails);
}
}
}

0 comments on commit ef74865

Please sign in to comment.