Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.9
  • Loading branch information
erilong committed Aug 20, 2018
2 parents 8d0df05 + 56b35c6 commit 2fcafce
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 39 deletions.
Expand Up @@ -372,10 +372,11 @@ public String getDeprecatedStartParameter() {
}

@Override
public void startJobsAfterConfigChange() {
// No action on Android
public void restartJobs() {
this.stopJobs();
this.startJobs();
}

@Override
public void init() {
// No action on Android
Expand Down
Expand Up @@ -72,13 +72,7 @@ protected Map<String, String> createSqlReplacementTokens() {

@Override
public void init() {
if (this.jobs != null && !this.jobs.isEmpty()) {
for (IJob job : jobs) {
if (job.isStarted()) {
job.stop();
}
}
}
this.stopJobs();
List<JobDefinition> jobDefitions = loadJobs(engine);

BuiltInJobs builtInJobs = new BuiltInJobs();
Expand Down Expand Up @@ -138,17 +132,7 @@ public boolean isJobApplicableToNodeGroup(IJob job) {
return engine.getParameterService().getNodeGroupId().equals(nodeGroupId);
}

@Override
public synchronized void startJobsAfterConfigChange() {
if (jobs != null) {
for (IJob job : jobs) {
if (isAutoStartConfigured(job) && !job.isStarted()) {
job.start();
}
}
}
}


protected boolean isAutoStartConfigured(IJob job) {
String autoStartValue = null;

Expand Down Expand Up @@ -210,6 +194,12 @@ public int compare(IJob job1, IJob job2) {
}
return jobsSorted;
}

@Override
public void restartJobs() {
this.init();
this.startJobs();
}

@Override
public void saveJob(JobDefinition job) {
Expand All @@ -220,8 +210,7 @@ public void saveJob(JobDefinition job) {
if (sqlTemplate.update(getSql("updateJobSql"), args) <= 0) {
sqlTemplate.update(getSql("insertJobSql"), args);
}
init();
startJobsAfterConfigChange();
restartJobs();
}

@Override
Expand All @@ -233,7 +222,6 @@ public void removeJob(String name) {
} else {
throw new SymmetricException("Failed to remove job " + name + ". Note that BUILT_IN jobs cannot be removed.");
}
init();
startJobsAfterConfigChange();
restartJobs();
}
}
Expand Up @@ -1022,7 +1022,7 @@ public void clearCaches() {
getNodeService().flushNodeAuthorizedCache();
getNodeService().flushNodeCache();
getNodeService().flushNodeGroupCache();
getJobManager().startJobsAfterConfigChange();
getJobManager().restartJobs();
getLoadFilterService().clearCache();
getMonitorService().flushMonitorCache();
getMonitorService().flushNotificationCache();
Expand Down
Expand Up @@ -36,8 +36,6 @@ public interface IJobManager {

public void stopJobs();

public void startJobsAfterConfigChange();

public void destroy();

public List<IJob> getJobs();
Expand All @@ -52,4 +50,6 @@ public interface IJobManager {

public boolean isJobApplicableToNodeGroup(IJob job);

public void restartJobs();

}
Expand Up @@ -315,8 +315,7 @@ public void syncEnded(DataContext context, List<IncomingBatch> batchesProcessed,
IJobManager jobManager = engine.getJobManager();
if (jobManager != null && jobManager.isStarted()) {
log.info("About to restart jobs because new configuration came through the data loader");
jobManager.init();
jobManager.startJobsAfterConfigChange();
jobManager.restartJobs();
}
context.remove(CTX_KEY_RESTART_JOBMANAGER_NEEDED);
}
Expand Down
Expand Up @@ -411,7 +411,7 @@ public void setStatus(Status status) {
this.status = status;
}

public void setStatus(String status) {
public void setStatusFromString(String status) {
try {
this.status = Status.valueOf(status);
} catch (IllegalArgumentException e) {
Expand Down
Expand Up @@ -634,8 +634,7 @@ public void contextCommitted(SimpleRouterContext routingContext) {
IJobManager jobManager = engine.getJobManager();
if (jobManager != null) {
log.info("About to restart jobs because new configuration come through the data router");
jobManager.init();
jobManager.startJobsAfterConfigChange();
jobManager.restartJobs();
}
}

Expand Down
Expand Up @@ -490,7 +490,7 @@ public IncomingBatch mapRow(Row rs) {
batch.setBatchId(rs.getLong("batch_id"));
batch.setNodeId(rs.getString("node_id"));
batch.setChannelId(rs.getString("channel_id"));
batch.setStatus(rs.getString("status"));
batch.setStatusFromString(rs.getString("status"));
batch.setRouterMillis(rs.getLong("router_millis"));
batch.setNetworkMillis(rs.getLong("network_millis"));
batch.setFilterMillis(rs.getLong("filter_millis"));
Expand Down
Expand Up @@ -1087,7 +1087,7 @@ public OutgoingBatch mapRow(Row rs) {
if (channel != null && (includeDisabledChannels || channel.isEnabled())) {
OutgoingBatch batch = new OutgoingBatch();
batch.setNodeId(rs.getString("node_id"));
batch.setStatus(rs.getString("status"));
batch.setStatusFromString(rs.getString("status"));
batch.setBatchId(rs.getLong("batch_id"));
if (!statusOnly) {
batch.setChannelId(channelId);
Expand Down
Expand Up @@ -601,7 +601,7 @@ protected synchronized void reOpenRegistration(String nodeId, String remoteHost,
}

if (isNotBlank(remoteHost)) {
NodeHost nodeHost = new NodeHost(node.getNodeId(), engine.getClusterService().getInstanceId());
NodeHost nodeHost = new NodeHost(node.getNodeId(), null);
nodeHost.setHeartbeatTime(new Date());
nodeHost.setIpAddress(remoteAddress);
nodeHost.setHostName(remoteHost);
Expand Down Expand Up @@ -658,7 +658,7 @@ protected String openRegistration(Node node, String remoteHost, String remoteAdd
nodeId, password, masterToMasterOnly ? null : me.getNodeId() });

if (isNotBlank(remoteHost)) {
NodeHost nodeHost = new NodeHost(node.getNodeId(), engine.getClusterService().getInstanceId());
NodeHost nodeHost = new NodeHost(node.getNodeId(), null);
nodeHost.setHeartbeatTime(new Date());
nodeHost.setIpAddress(remoteAddress);
nodeHost.setHostName(remoteHost);
Expand Down
Expand Up @@ -520,8 +520,10 @@ public static void write(Table table, Writer output) {
for (ForeignKey fk : table.getForeignKeys()) {
output.write("\t\t<foreign-key name=\"" + StringEscapeUtils.escapeXml(fk.getName()) + "\" foreignTable=\""
+ StringEscapeUtils.escapeXml(fk.getForeignTableName()) + "\" foreignTableCatalog=\""
+ StringEscapeUtils.escapeXml(fk.getForeignTableCatalog() == null ? "" : fk.getForeignTableCatalog()) +
"\" foreignTableSchema=\"" + StringEscapeUtils.escapeXml(fk.getForeignTableSchema() == null ? "" : fk.getForeignTableSchema()) + "\">\n");
+ StringEscapeUtils.escapeXml(fk.getForeignTableCatalog() == null || fk.getForeignTableCatalog().equals(table.getOldCatalog())
? "" : fk.getForeignTableCatalog()) +
"\" foreignTableSchema=\"" + StringEscapeUtils.escapeXml(fk.getForeignTableSchema() == null ||
fk.getForeignTableSchema().equals(table.getOldSchema()) ? "" : fk.getForeignTableSchema()) + "\">\n");

for (Reference ref : fk.getReferences()) {
output.write("\t\t\t<reference local=\"" + StringEscapeUtils.escapeXml(ref.getLocalColumnName())
Expand Down

0 comments on commit 2fcafce

Please sign in to comment.