Skip to content

Commit

Permalink
Execution capabilities: limiting trigger acquisition to a set of nodes.
Browse files Browse the repository at this point in the history
(cherry picked from commit 58c3802)
  • Loading branch information
mederly committed Jul 31, 2017
1 parent 5b5a0a0 commit 5b078fd
Show file tree
Hide file tree
Showing 37 changed files with 380 additions and 144 deletions.
6 changes: 6 additions & 0 deletions quartz-core/src/main/java/org/quartz/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,4 +326,10 @@ public int compare(Trigger t1, Trigger t2) {
return compare(t1.getNextFireTime(), t1.getPriority(), t1.getKey(), t2.getNextFireTime(), t2.getPriority(), t2.getKey());
}
}

/**
* Execution capability that must be possessed by a node in order to be able to acquire this trigger.
* If null, any node can do that.
*/
String getRequiredCapability();
}
16 changes: 15 additions & 1 deletion quartz-core/src/main/java/org/quartz/TriggerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class TriggerBuilder<T extends Trigger> {
private String calendarName;
private JobKey jobKey;
private JobDataMap jobDataMap = new JobDataMap();
private String requiredCapability;

private ScheduleBuilder<?> scheduleBuilder = null;

Expand Down Expand Up @@ -113,7 +114,8 @@ public T build() {

if(!jobDataMap.isEmpty())
trig.setJobDataMap(jobDataMap);


trig.setRequiredCapability(requiredCapability);
return (T) trig;
}

Expand Down Expand Up @@ -412,5 +414,17 @@ public TriggerBuilder<T> usingJobData(JobDataMap newJobDataMap) {
jobDataMap = newJobDataMap; // set new map as the map to use
return this;
}

/**
* Set the node's execution capability that is required for the Trigger to be acquired
* at a given node.
* @param requiredCapability the required capability
* @return the updated TriggerBuilder
*/

public TriggerBuilder<T> requiredCapability(String requiredCapability) {
this.requiredCapability = requiredCapability;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public interface Constants {

String COL_MISFIRE_INSTRUCTION = "MISFIRE_INSTR";

String COL_REQUIRED_CAP = "REQUIRED_CAP";

String ALIAS_COL_NEXT_FIRE_TIME = "ALIAS_NXT_FR_TM";

// TABLE_SIMPLE_TRIGGERS columns names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -935,13 +936,13 @@ Key<?> selectTriggerForFireTime(Connection conn, long fireTime)
* @param conn
* the DB Connection
* @param noLaterThan
* highest value of <code>getNextFireTime()</code> of the triggers (exclusive)
* highest value of <code>getNextFireTime()</code> of the triggers (exclusive) TODO doesn't match the SQL code
* @param noEarlierThan
* highest value of <code>getNextFireTime()</code> of the triggers (inclusive)
* lowest value of <code>getNextFireTime()</code> of the triggers (inclusive)
*
* @return A (never null, possibly empty) list of the identifiers (Key objects) of the next triggers to be fired.
*
* @deprecated - This remained for compatibility reason. Use {@link #selectTriggerToAcquire(Connection, long, long, int)} instead.
* @deprecated - This remained for compatibility reason. Use {@link #selectTriggerToAcquire(Connection, long, long, int, Collection)} instead.
*/
public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan)
throws SQLException;
Expand All @@ -955,15 +956,18 @@ public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan
* @param conn
* the DB Connection
* @param noLaterThan
* highest value of <code>getNextFireTime()</code> of the triggers (exclusive)
* @param noEarlierThan
* highest value of <code>getNextFireTime()</code> of the triggers (inclusive)
* @param maxCount
* highest value of <code>getNextFireTime()</code> of the triggers (exclusive) TODO doesn't match the SQL code
* @param noEarlierThan
* lowest value of <code>getNextFireTime()</code> of the triggers (inclusive)
* @param maxCount
* maximum number of trigger keys allow to acquired in the returning list.
*
* @param executionCapabilities
* capabilities of the current node: we select only triggers that have requiredCapability either null or
* matching some of the executionCapabilities
* @return A (never null, possibly empty) list of the identifiers (Key objects) of the next triggers to be fired.
*/
public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount)
public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount,
Collection<String> executionCapabilities)
throws SQLException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,7 @@
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;

import org.quartz.Calendar;
import org.quartz.Job;
Expand Down Expand Up @@ -102,6 +95,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
protected String instanceId;

protected String instanceName;

private Set<String> executionCapabilitiesParsed = new HashSet<>();

protected String executionCapabilities; // comma-separated values (in order to be settable by properties)

protected String delegateClassName;

Expand Down Expand Up @@ -258,6 +255,29 @@ public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}

public Collection<String> getExecutionCapabilitiesParsed() {
return Collections.unmodifiableSet(executionCapabilitiesParsed);
}

public String getExecutionCapabilities() {
return executionCapabilities;
}

// invoked by reflection
// stringValue contains capabilities separated by comma
public void setExecutionCapabilities(String stringValue) {
executionCapabilities = stringValue;
executionCapabilitiesParsed.clear();
if (stringValue != null) {
for (String capability : stringValue.split(",")) {
capability = capability.trim();
if (!capability.isEmpty()) {
executionCapabilitiesParsed.add(capability);
}
}
}
}

public void setThreadPoolSize(final int poolSize) {
//
}
Expand Down Expand Up @@ -2841,7 +2861,7 @@ protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLater
do {
currentLoopCount ++;
try {
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount, getExecutionCapabilitiesParsed());

// No trigger is ready to fire yet.
if (keys == null || keys.size() == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@

import org.quartz.Calendar;
import org.quartz.JobDetail;
import org.quartz.spi.ClassLoadHelper;
import org.quartz.spi.OperableTrigger;
import org.slf4j.Logger;

/**
* <p>
Expand Down Expand Up @@ -183,6 +181,7 @@ public int insertTrigger(Connection conn, OperableTrigger trigger, String state,
ps.setInt(13, trigger.getMisfireInstruction());
ps.setBinaryStream(14, bais, len);
ps.setInt(15, trigger.getPriority());
ps.setString(16, trigger.getRequiredCapability());

insertResult = ps.executeUpdate();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public interface StdJDBCConstants extends Constants {
// table prefix substitution string
String SCHED_NAME_SUBST = "{1}";

// additional substitution string (currently used for 'required capability' clause
// but might be useful for other things in the future)
String ADDITIONAL_SUBST = "{2}";

// QUERIES
String UPDATE_TRIGGER_STATES_FROM_OTHER_STATES = "UPDATE "
+ TABLE_PREFIX_SUBST
Expand Down Expand Up @@ -212,8 +216,8 @@ public interface StdJDBCConstants extends Constants {
+ ", " + COL_NEXT_FIRE_TIME + ", " + COL_PREV_FIRE_TIME + ", "
+ COL_TRIGGER_STATE + ", " + COL_TRIGGER_TYPE + ", "
+ COL_START_TIME + ", " + COL_END_TIME + ", " + COL_CALENDAR_NAME
+ ", " + COL_MISFIRE_INSTRUCTION + ", " + COL_JOB_DATAMAP + ", " + COL_PRIORITY + ") "
+ " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ ", " + COL_MISFIRE_INSTRUCTION + ", " + COL_JOB_DATAMAP + ", " + COL_PRIORITY + ", " + COL_REQUIRED_CAP + ") "
+ " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

String INSERT_SIMPLE_TRIGGER = "INSERT INTO "
+ TABLE_PREFIX_SUBST + TABLE_SIMPLE_TRIGGERS + " ("
Expand Down Expand Up @@ -242,8 +246,9 @@ public interface StdJDBCConstants extends Constants {
+ COL_PREV_FIRE_TIME + " = ?, " + COL_TRIGGER_STATE + " = ?, "
+ COL_TRIGGER_TYPE + " = ?, " + COL_START_TIME + " = ?, "
+ COL_END_TIME + " = ?, " + COL_CALENDAR_NAME + " = ?, "
+ COL_MISFIRE_INSTRUCTION + " = ?, " + COL_PRIORITY
+ " = ? WHERE "
+ COL_MISFIRE_INSTRUCTION + " = ?, " + COL_PRIORITY + " = ?, "
+ COL_REQUIRED_CAP
+ " = ? WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_TRIGGER_NAME
+ " = ? AND " + COL_TRIGGER_GROUP + " = ?";
Expand All @@ -255,7 +260,8 @@ public interface StdJDBCConstants extends Constants {
+ COL_PREV_FIRE_TIME + " = ?, " + COL_TRIGGER_STATE + " = ?, "
+ COL_TRIGGER_TYPE + " = ?, " + COL_START_TIME + " = ?, "
+ COL_END_TIME + " = ?, " + COL_CALENDAR_NAME + " = ?, "
+ COL_MISFIRE_INSTRUCTION + " = ?, " + COL_PRIORITY + " = ?, "
+ COL_MISFIRE_INSTRUCTION + " = ?, " + COL_PRIORITY + " = ?, "
+ COL_REQUIRED_CAP + " = ?, "
+ COL_JOB_DATAMAP + " = ? WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_TRIGGER_NAME + " = ? AND " + COL_TRIGGER_GROUP + " = ?";
Expand Down Expand Up @@ -513,6 +519,7 @@ public interface StdJDBCConstants extends Constants {
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? "
+ "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" +COL_MISFIRE_INSTRUCTION+ " != -1 AND "+ COL_NEXT_FIRE_TIME + " >= ?)) "
+ "AND (" + COL_REQUIRED_CAP + " IS NULL " + ADDITIONAL_SUBST + ") "
+ "ORDER BY "+ COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC";


Expand Down

0 comments on commit 5b078fd

Please sign in to comment.