Skip to content

Commit

Permalink
0002684: Support Custom Jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed May 10, 2017
1 parent 20f9e04 commit c3e2730
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 5 deletions.
Expand Up @@ -79,6 +79,7 @@ protected void setBuiltInDefaults(IJob argBuiltInJob) {
jobDefinition.setRequiresRegistration(builtInJob.getDefaults().isRequiresRegisteration());
jobDefinition.setJobType(JobType.BUILT_IN);
jobDefinition.setJobExpression(argBuiltInJob.getClass().getName());
jobDefinition.setNodeGroupId("ALL");
jobDefinition.setCreateBy("SymmetricDS");
builtInJob.setJobDefinition(jobDefinition);
}
Expand Down
Expand Up @@ -119,7 +119,7 @@ public IJob getJob(String name) {
@Override
public synchronized void startJobs() {
for (IJob job : jobs) {
if (isAutoStartConfigured(job)) {
if (isAutoStartConfigured(job) && isJobApplicableToNodeGroup(job)) {
job.start();
} else {
log.info("Job {} not configured for auto start", job.getName());
Expand All @@ -128,6 +128,15 @@ public synchronized void startJobs() {
started = true;
}

@Override
public boolean isJobApplicableToNodeGroup(IJob job) {
String nodeGroupId = job.getJobDefinition().getNodeGroupId();
if (StringUtils.isEmpty(nodeGroupId) || nodeGroupId.equals("ALL")) {
return true;
}

return engine.getParameterService().getNodeGroupId().equals(nodeGroupId);
}

@Override
public synchronized void startJobsAfterConfigChange() {
Expand Down Expand Up @@ -202,7 +211,7 @@ public int compare(IJob job1, IJob job2) {
public void saveJob(JobDefinition job) {
Object[] args = { job.getDescription(), job.getJobType().toString(),
job.getJobExpression(), job.isDefaultAutomaticStartup(), job.getDefaultSchedule(),
job.getCreateBy(), job.getLastUpdateBy(), job.getJobName() };
job.getNodeGroupId(), job.getCreateBy(), job.getLastUpdateBy(), job.getJobName() };

if (sqlTemplate.update(getSql("updateJobSql"), args) == 0) {
sqlTemplate.update(getSql("insertJobSql"), args);
Expand Down
Expand Up @@ -32,12 +32,13 @@ public JobManagerSqlMap(IDatabasePlatform platform, Map<String, String> replacem
putSql("loadCustomJobs",
"select * from $(job) order by job_type, job_name");

putSql("insertJobSql", "insert into $(job) (description, job_type, job_expression, default_auto_start, default_schedule, "
putSql("insertJobSql", "insert into $(job) (description, job_type, job_expression, "
+ "default_auto_start, default_schedule, node_group_id, "
+ "create_by, create_time, last_update_by, last_update_time, job_name) " +
"values (?, ?, ?, ?, ?, ?, current_timestamp, ?, current_timestamp, ?)");
"values (?, ?, ?, ?, ?, ?, ?, current_timestamp, ?, current_timestamp, ?)");

putSql("updateJobSql", "update $(job) set description = ?, job_type = ?, job_expression = ?, "
+ "default_auto_start = ?, default_schedule = ?, "
+ "default_auto_start = ?, default_schedule = ?, node_group_id = ?, "
+ "create_by = ?, last_update_by = ?, last_update_time = current_timestamp "
+ "where job_name = ?");

Expand Down
Expand Up @@ -38,6 +38,7 @@ public JobDefinition mapRow(Row row) {
jobDefinition.setDefaultAutomaticStartup(row.getBoolean("default_auto_start"));
jobDefinition.setDefaultSchedule(row.getString("default_schedule"));
jobDefinition.setDescription(row.getString("description"));
jobDefinition.setNodeGroupId(row.getString("node_group_id"));
jobDefinition.setCreateBy(row.getString("create_by"));
jobDefinition.setCreateTime(row.getDateTime("create_time"));
jobDefinition.setLastUpdateBy(row.getString("last_update_by"));
Expand Down
Expand Up @@ -50,4 +50,6 @@ public interface IJobManager {

public boolean isStarted();

public boolean isJobApplicableToNodeGroup(IJob job);

}
Expand Up @@ -39,6 +39,7 @@ public enum JobType {BUILT_IN, BSH, JAVA, SQL}
private Date lastUpdateTime;
private boolean defaultAutomaticStartup;
private String defaultSchedule;
private String nodeGroupId;
private transient boolean automaticStartup;
private transient String schedule;

Expand Down Expand Up @@ -121,6 +122,14 @@ public void setDefaultSchedule(String defaultSchedule) {
this.defaultSchedule = defaultSchedule;
}

public String getNodeGroupId() {
return nodeGroupId;
}

public void setNodeGroupId(String nodeGroupId) {
this.nodeGroupId = nodeGroupId;
}

public String getSchedule() {
return schedule;
}
Expand Down
Expand Up @@ -95,6 +95,9 @@ public class ConfigurationChangedDataRouter extends AbstractDataRouter implement

final String CTX_KEY_FLUSHED_TRIGGER_ROUTERS = "FlushedTriggerRouters."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

final String CTX_KEY_FLUSH_JOBS_NEEDED = "FlushJobs."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

public final static String KEY = "symconfig";

Expand Down Expand Up @@ -238,6 +241,10 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
if (tableMatches(dataMetaData, TableConstants.SYM_NOTIFICATION)) {
routingContext.put(CTX_KEY_FLUSH_NOTIFICATIONS_NEEDED, Boolean.TRUE);
}

if (tableMatches(dataMetaData, TableConstants.SYM_JOB)) {
routingContext.put(CTX_KEY_FLUSH_JOBS_NEEDED, Boolean.TRUE);
}
}
}

Expand Down Expand Up @@ -613,6 +620,12 @@ public void contextCommitted(SimpleRouterContext routingContext) {
engine.getMonitorService().flushNotificationCache();
}

if (routingContext.get(CTX_KEY_FLUSH_JOBS_NEEDED) != null) {
log.info("About to reset the job manager because new configuration came through the data router");
engine.getJobManager().init();
engine.getJobManager().startJobs();
}

if (routingContext.get(CTX_KEY_FLUSH_NODES_NEEDED) != null) {
log.info("About to refresh the cache of nodes because new configuration came through the data router");
engine.getNodeService().flushNodeCache();
Expand Down
1 change: 1 addition & 0 deletions symmetric-core/src/main/resources/symmetric-schema.xml
Expand Up @@ -880,6 +880,7 @@
<column name="description" type="VARCHAR" size="255" description="An optional description of the job for users of the system." />
<column name="default_schedule" type="VARCHAR" size="50" description="The schedule to use if no schedule parameter is found. Overridden by job.jobname.period.time.ms or job.jobname.cron." />
<column name="default_auto_start" type="BOOLEANINT" size="1" required="true" default="1" description="Determine if this job should auto start. Overridden by start.jobname.job." />
<column name="node_group_id" type="VARCHAR" size="50" required="true" description="Target the job at a specific node group id. To target all groups, use the value of 'ALL'." />
<column name="create_by" type="VARCHAR" size="50" description="The user who created this entry." />
<column name="create_time" type="TIMESTAMP" description="Timestamp when this entry was created." />
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
Expand Down

0 comments on commit c3e2730

Please sign in to comment.