Skip to content

Commit

Permalink
[HWKMETRICS-648] add supporting for modifying and persisting job para…
Browse files Browse the repository at this point in the history
…meters
  • Loading branch information
John Sanda committed Apr 20, 2017
1 parent 149f49d commit 8abb44c
Show file tree
Hide file tree
Showing 15 changed files with 587 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ public DeleteExpiredMetrics(MetricsService metricsService, RxSession session,
}

@Override
public Completable call(JobDetails jobDetails) {
public Completable call(JobDetails JobDetails) {
logger.info("Starting delete expired metrics job");
Stopwatch stopwatch = Stopwatch.createStarted();

String unparsedConfigExpirationTime = jobDetails.getParameters().get("expirationTimestamp");
String unparsedConfigExpirationTime = JobDetails.getParameters().get("expirationTimestamp");
Long configuredExpirationTime = null;
if (unparsedConfigExpirationTime != null && !unparsedConfigExpirationTime.isEmpty()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ private void maybeScheduleCompressData(List<JobDetails> backgroundJobs) {
long nextStart = LocalDateTime.now(ZoneOffset.UTC)
.with(DateTimeService.startOfNextOddHour())
.toInstant(ZoneOffset.UTC).toEpochMilli();
JobDetails jobDetails = scheduler.scheduleJob(CompressData.JOB_NAME, CompressData.JOB_NAME,
JobDetails JobDetails = scheduler.scheduleJob(CompressData.JOB_NAME, CompressData.JOB_NAME,
ImmutableMap.of(), new RepeatingTrigger.Builder().withTriggerTime(nextStart)
.withInterval(2, TimeUnit.HOURS).build()).toBlocking().value();
backgroundJobs.add(jobDetails);
configurationService.save(configId, "jobId", jobDetails.getJobId().toString()).toBlocking();
backgroundJobs.add(JobDetails);
configurationService.save(configId, "jobId", JobDetails.getJobId().toString()).toBlocking();

logger.info("Created and scheduled " + jobDetails);
logger.info("Created and scheduled " + JobDetails);
}
}

Expand Down Expand Up @@ -197,16 +197,16 @@ private void maybeScheduleMetricExpirationJob(List<JobDetails> backgroundJobs) {

//Get start of next day
long nextStart = DateTimeService.current24HourTimeSlice().plusDays(1).getMillis();
JobDetails jobDetails = scheduler.scheduleJob(DeleteExpiredMetrics.JOB_NAME, DeleteExpiredMetrics.JOB_NAME,
JobDetails JobDetails = scheduler.scheduleJob(DeleteExpiredMetrics.JOB_NAME, DeleteExpiredMetrics.JOB_NAME,
ImmutableMap.of(), new RepeatingTrigger.Builder().withTriggerTime(nextStart)
.withInterval(this.metricExpirationJobFrequencyInDays, TimeUnit.DAYS).build())
.toBlocking().value();
backgroundJobs.add(jobDetails);
configurationService.save(configId, jobIdConfigKey, jobDetails.getJobId().toString()).toBlocking();
backgroundJobs.add(JobDetails);
configurationService.save(configId, jobIdConfigKey, JobDetails.getJobId().toString()).toBlocking();
configurationService.save(configId, jobFrequencyKey, this.metricExpirationJobFrequencyInDays + "")
.toBlocking();

logger.info("Created and scheduled " + jobDetails);
logger.info("Created and scheduled " + JobDetails);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void initClass() {
@BeforeMethod(alwaysRun = true)
public void initMethod() {
session.execute("TRUNCATE tenants");
session.execute("TRUNCATE data");
// session.execute("TRUNCATE data");
session.execute("TRUNCATE data_compressed");
session.execute("TRUNCATE metrics_idx");
session.execute("TRUNCATE retentions_idx");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -16,99 +16,47 @@
*/
package org.hawkular.metrics.scheduler.api;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

import com.google.common.base.MoreObjects;
import rx.functions.Func1;
import rx.functions.Func2;

/**
* Provides information about scheduled jobs.
*
* @author jsanda
*/
public class JobDetails {

private UUID jobId;

private String jobType;

private String jobName;

private Map<String, String> parameters;

private Trigger trigger;

private JobStatus status;

public JobDetails(UUID jobId, String jobType, String jobName, Map<String, String> parameters, Trigger trigger) {
this.jobId = jobId;
this.jobType = jobType;
this.jobName = jobName;
this.parameters = Collections.unmodifiableMap(parameters);
this.trigger = trigger;
status = JobStatus.NONE;
}

public JobDetails(UUID jobId, String jobType, String jobName, Map<String, String> parameters, Trigger trigger,
JobStatus status) {
this.jobId = jobId;
this.jobType = jobType;
this.jobName = jobName;
this.parameters = Collections.unmodifiableMap(parameters);
this.trigger = trigger;
this.status = status;
}

public UUID getJobId() {
return jobId;
}

public String getJobType() {
return jobType;
}

public String getJobName() {
return jobName;
}

public Map<String, String> getParameters() {
return parameters;
}

public Trigger getTrigger() {
return trigger;
}

public JobStatus getStatus() {
return status;
}

@Override public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JobDetails details = (JobDetails) o;
return Objects.equals(jobId, details.jobId) &&
Objects.equals(jobType, details.jobType) &&
Objects.equals(jobName, details.jobName) &&
Objects.equals(parameters, details.parameters) &&
Objects.equals(trigger, details.trigger) &&
Objects.equals(status, details.status);
}

@Override public int hashCode() {
return Objects.hash(jobId, jobType, jobName, parameters, trigger, status);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("jobId", jobId)
.add("jobType", jobType)
.add("jobName", jobName)
.add("parameters", parameters)
.add("trigger", trigger)
.add("status", status)
.omitNullValues()
.toString();
}
public interface JobDetails {

/**
* A unique identifier that the scheduler uses to query Cassandra for the job details
*/
UUID getJobId();

/**
* Every job has a type. The scheduler uses the type to determine who is responsible for the job execution.
* @see Scheduler#register(String, Func1)
* @see Scheduler#register(String, Func1, Func2)
*/
String getJobType();

/**
* Note that thee job name does not have to be unique.
*/
String getJobName();

/**
* The job {@link JobParameters parameters} which are mutable
*/
JobParameters getParameters();

/**
* The {@link Trigger trigger} specifies when the job will execute.
*/
Trigger getTrigger();

/**
* This is primarily for internal use by the scheduler.
*/
JobStatus getStatus();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.scheduler.api;

import java.util.Map;

import rx.Completable;

/**
* Provides a map-like key/value API for job parameters. The parameters are mutable. Changes will be persisted after
* job execution completes (for repeating jobs). Changes can also be persisted during execution using the
* {@link #save() save} method.
*
* @author jsanda
*/
public interface JobParameters {

/**
* Return the value associated with the key or null if there is no such parameter
*/
String get(String key);

/**
* Associates the value with the key and returns the old value if there previously was a mapping for the key
*/
String put(String key, String value);

/**
* Removes the value associated with the key or null if there is no such parameter
*/
String remove(String key);

/**
* Return true if the parameters contain a value for the key
*/
boolean containsKey(String key);

/**
* Return an immutable map of the parameters. Note that this map is a copy. Any changes made to the parameters
* through methods like {@link #get(String) get} or {@link #put(String, String) put} will not be reflected in
* this map.
*/
Map<String, String> getMap();

/**
* Asynchronously save the parameters back to Cassandra. For reoccurring jobs any changes to parameters will
* automatically be persisted when the job finishes and is rescheduled for its next run. This method can be useful
* for long running jobs that perform a lot of work. It can be used to create checkpoints so that if a job is
* abruptly stopped and restarted it can resume its work from that checkpoint rather than starting all over.
*/
Completable save();

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface Scheduler {
* @param trigger
* @return A Single that emits the job details
*/
Single<JobDetails> scheduleJob(String type, String name, Map<String, String> parameters, Trigger trigger);
Single<? extends JobDetails> scheduleJob(String type, String name, Map<String, String> parameters, Trigger trigger);

/**
* Deletes all the scheduled execution for a job id.
Expand Down
Loading

0 comments on commit 8abb44c

Please sign in to comment.