Skip to content

Commit

Permalink
[HWKMETRICS-738] unschedule CompressData job and add error handling f…
Browse files Browse the repository at this point in the history
…or unregistered jobs
  • Loading branch information
John Sanda committed Sep 27, 2017
1 parent 29147ec commit 2273fbf
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;

import rx.Completable;
import rx.Observable;
import rx.Scheduler;

Expand All @@ -42,6 +43,8 @@ public class ConfigurationService {

private PreparedStatement deleteConfigurationValue;

private PreparedStatement deleteConfiguration;

// TODO make async
// I could have just as easily passed the session as a constructor arg. I am doing it in the init method because
// eventually I would like service initialization async.
Expand All @@ -62,6 +65,10 @@ public void init(RxSession session) {
deleteConfigurationValue = session.getSession().prepare(
"DELETE FROM sys_config WHERE config_id =? and name = ?")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);

deleteConfiguration = session.getSession().prepare(
"DELETE FROM sys_config WHERE config_id = ?")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
}

public Observable<Configuration> load(String id) {
Expand Down Expand Up @@ -97,7 +104,11 @@ public Observable<Void> save(String configId, String name, String value, Schedul
return session.execute(updateConfigurationValue.bind(configId, name, value), scheduler).map(resultSet -> null);
}

public Observable<Void> delete(String configId, String name) {
return session.execute(deleteConfigurationValue.bind(configId, name)).map(resultSet -> null);
public Completable delete(String configId, String name) {
return session.execute(deleteConfigurationValue.bind(configId, name)).toCompletable();
}

public Completable delete(String configId) {
return session.execute(deleteConfiguration.bind(configId)).toCompletable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import com.google.common.collect.ImmutableMap;

import rx.Completable;
import rx.Single;
import rx.functions.Func2;

Expand Down Expand Up @@ -102,6 +103,9 @@ public void setScheduler(Scheduler scheduler) {
public List<JobDetails> start() {
List<JobDetails> backgroundJobs = new ArrayList<>();

// The CompressData job has been replaced by the TempDataCompressor job
unscheduleCompressData();

deleteTenant = new DeleteTenant(session, metricsService);

// Use a simple retry policy to make sure tenant deletion does complete in the event of failure. For now
Expand All @@ -121,9 +125,7 @@ public List<JobDetails> start() {
TempDataCompressor tempJob = new TempDataCompressor(metricsService, configurationService);
scheduler.register(TempDataCompressor.JOB_NAME, tempJob);

// CompressData compressDataJob = new CompressData(metricsService, configurationService);
// scheduler.register(CompressData.JOB_NAME, compressDataJob);
maybeScheduleCompressData(backgroundJobs);
maybeScheduleTempDataCompressor(backgroundJobs);

deleteExpiredMetrics = new DeleteExpiredMetrics(metricsService, session, configurationService,
this.metricExpirationDelay);
Expand Down Expand Up @@ -171,7 +173,31 @@ private void maybeScheduleTableCreator(List<JobDetails> backgroundJobs) {
}
}

private void maybeScheduleCompressData(List<JobDetails> backgroundJobs) {
private void unscheduleCompressData() {
Configuration config = configurationService.load(CompressData.CONFIG_ID).toBlocking()
.firstOrDefault(new Configuration(CompressData.CONFIG_ID, new HashMap<>()));
String jobId = config.get("jobId");

if (config.getProperties().isEmpty()) {
// This means we have a new installation and not an upgrade. The CompressData job has not been previously
// installed so there is no db clean up necessary.
} else {
Completable unscheduled;
if (jobId == null) {
logger.warnf("Expected to find a jobId property in database for %s. Attempting to unschedule job by " +
"name.", CompressData.JOB_NAME);
unscheduled = scheduler.unscheduleJobByTypeAndName(CompressData.JOB_NAME, CompressData.JOB_NAME);
} else {
unscheduled = scheduler.unscheduleJobById(jobId);
}
unscheduled.await();
if (!config.getProperties().isEmpty()) {
configurationService.delete(CompressData.CONFIG_ID).await();
}
}
}

private void maybeScheduleTempDataCompressor(List<JobDetails> backgroundJobs) {
String configId = TempDataCompressor.CONFIG_ID;
Configuration config = configurationService.load(configId).toBlocking()
.firstOrDefault(new Configuration(configId, new HashMap<>()));
Expand All @@ -183,13 +209,6 @@ private void maybeScheduleCompressData(List<JobDetails> backgroundJobs) {
.with(DateTimeService.startOfNextOddHour())
.toInstant(ZoneOffset.UTC).toEpochMilli();

// CompressData
// JobDetails jobDetails = scheduler.scheduleJob(CompressData.JOB_NAME, CompressData.JOB_NAME,
// ImmutableMap.of(), new RepeatingTrigger.Builder().withTriggerTime(nextStart)
// .withInterval(2, TimeUnit.HOURS).build()).toBlocking().value();
// backgroundJobs.add(jobDetails);
// configurationService.save(configId, "jobId", jobDetails.getJobId().toString()).toBlocking();

// Temp table processing
JobDetails jobDetails = scheduler.scheduleJob(TempDataCompressor.JOB_NAME, TempDataCompressor.JOB_NAME,
ImmutableMap.of(), new RepeatingTrigger.Builder().withTriggerTime(nextStart)
Expand Down Expand Up @@ -220,10 +239,11 @@ private void maybeScheduleMetricExpirationJob(List<JobDetails> backgroundJobs) {
if (configuredJobFrequency == null || configuredJobFrequency != this.metricExpirationJobFrequencyInDays
|| this.metricExpirationJobFrequencyInDays <= 0 || configuredJobFrequency <= 0 ||
!this.metricExpirationJobEnabled) {
scheduler.unscheduleJob(config.get(jobIdConfigKey)).await();
configurationService.delete(configId, jobIdConfigKey).toBlocking();
scheduler.unscheduleJobById(config.get(jobIdConfigKey)).await();
configurationService.delete(configId, jobIdConfigKey)
.concatWith(configurationService.delete(configId, jobFrequencyKey))
.await();
config.delete(jobIdConfigKey);
configurationService.delete(configId, jobFrequencyKey).toBlocking();
config.delete(jobFrequencyKey);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public interface Scheduler {
* @param jobId
* @return Completable instance
*/
Completable unscheduleJob(String jobId);
Completable unscheduleJobById(String jobId);

Completable unscheduleJobByTypeAndName(String jobType, String jobName);

/**
* Register a function that produces a job of the specified type. This method should be called prior to scheduling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,12 @@ public JobsService(RxSession session) {
findScheduledForTime = session.getSession().prepare(
"SELECT job_id, job_type, job_name, job_params, trigger, status FROM scheduled_jobs_idx " +
"WHERE time_slice = ?");

// In general this is not a good way to execute queries in Cassandra; however, the number partitions with which
// we are dealing is going to very small. The Cassandra clusters are also generally only two or three nodes.
findAllScheduled = session.getSession().prepare(
"SELECT time_slice, job_id, job_type, job_name, job_params, trigger, status FROM scheduled_jobs_idx");

update = session.getSession().prepare(
"INSERT INTO jobs (id, type, name, params, trigger) VALUES (?, ?, ?, ?, ?)");
insertScheduled = session.getSession().prepare(
Expand All @@ -100,6 +104,18 @@ public Observable<Date> findActiveTimeSlices(Date currentTime, rx.Scheduler sche
.concatWith(Observable.just(currentTime));
}

public Observable<JobDetailsImpl> findAllScheduledJobs(rx.Scheduler scheduler) {
return session.executeAndFetch(findAllScheduled.bind(), scheduler)
.map(row -> createJobDetails(
row.getUUID(1),
row.getString(2),
row.getString(3),
row.getMap(4, String.class, String.class),
getTrigger(row.getUDTValue(5)),
JobStatus.fromCode(row.getByte(6)),
row.getTimestamp(0)));
}

public Observable<JobDetailsImpl> findJobs(Date timeSlice, rx.Scheduler scheduler) {
return findActiveTimeSlices(timeSlice, scheduler)
.flatMap(time -> findScheduledJobsForTime(time, scheduler))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,18 @@ public Single<? extends JobDetails> scheduleJob(String type, String name, Map<St
}

@Override
public Completable unscheduleJob(String jobId) {
public Completable unscheduleJobById(String jobId) {
return jobsService.deleteJob(UUID.fromString(jobId), queryScheduler);
}

@Override public Completable unscheduleJobByTypeAndName(String jobType, String jobName) {
return Completable.merge(
jobsService.findAllScheduledJobs(queryScheduler)
.filter(details -> details.getJobType().equals(jobType) && details.getJobName().equals(jobName))
.map(details -> jobsService.deleteJob(details.getJobId(), queryScheduler))
);
}

@Override
public void start() {
running = true;
Expand Down Expand Up @@ -390,7 +398,9 @@ private Completable executeJob(JobDetailsImpl details, Date timeSlice, Set<UUID>
Func1<JobDetails, Completable> factory = jobFactories.get(details.getJobType());
Completable job;

if (details.getStatus() == JobStatus.FINISHED) {
if (factory == null) {
job = Completable.error(new UnregisteredJobException(details, timeSlice));
} else if (details.getStatus() == JobStatus.FINISHED) {
Observable<Completable> observable = jobsService.findScheduledExecutions(details.getJobId(), queryScheduler)
.filter(scheduledExecution -> scheduledExecution.getJobDetails().getStatus() == JobStatus.NONE &&
scheduledExecution.getJobDetails().getTrigger().getTriggerTime() >
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,13 @@ public void register(String jobType, Func1<JobDetails, Completable> jobProducer,
}

@Override
public Completable unscheduleJob(String jobId) {
return scheduler.unscheduleJob(jobId);
public Completable unscheduleJobById(String jobId) {
return scheduler.unscheduleJobById(jobId);
}

@Override
public Completable unscheduleJobByTypeAndName(String jobType, String jobName) {
return scheduler.unscheduleJobByTypeAndName(jobType, jobName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.scheduler.impl;

import java.util.Date;

import org.hawkular.metrics.scheduler.api.JobDetails;

/**
* @author jsanda
*/
public class UnregisteredJobException extends RuntimeException {

public UnregisteredJobException(JobDetails jobDetails, Date timeSlice) {
super("No factory has been registered for " + jobDetails + ". It cannot be executed for time slice [" +
timeSlice + "]");
}
}

0 comments on commit 2273fbf

Please sign in to comment.