Skip to content

Commit

Permalink
Merge branch '3.12' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.12
  • Loading branch information
joshhicks committed May 28, 2021
2 parents f1cb6e6 + d652a67 commit 0f87d7c
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 17 deletions.
Expand Up @@ -185,12 +185,12 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc
Table table = platform.getTableFromCache(catalogName, schemaName, tableName, false);
if (table != null) {
String quoteChar = platform.getDatabaseInfo().getDelimiterToken();
schemaName = table.getSchema() == null ? "" : (quoteChar + table.getSchema()
String schemaPrefix = table.getSchema() == null ? "" : (quoteChar + table.getSchema()
+ quoteChar + ".");
final String dropSql = "drop trigger IF EXISTS " + triggerName + " on " + schemaName + quoteChar
final String dropSql = "drop trigger IF EXISTS " + triggerName + " on " + schemaPrefix + quoteChar
+ table.getName() + quoteChar;
logSql(dropSql, sqlBuffer);
final String dropFunction = "drop function IF EXISTS " + schemaName + "f" + triggerName
final String dropFunction = "drop function IF EXISTS " + schemaPrefix + "f" + triggerName
+ "() cascade";
logSql(dropFunction, sqlBuffer);
if (parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
Expand Down
Expand Up @@ -32,6 +32,8 @@ public interface INodeCommunicationService {

public List<NodeCommunication> list(CommunicationType communicationType);

public List<NodeCommunication> listAll(CommunicationType communicationType);

public NodeCommunication find(String nodeId, String channelId, CommunicationType communicationType);

public boolean execute(NodeCommunication nodeCommunication, RemoteNodeStatuses statuses, INodeCommunicationExecutor executor);
Expand Down
Expand Up @@ -2406,7 +2406,17 @@ protected String reloadTable(String nodeId, String catalogName, String schemaNam
for (TriggerHistory triggerHistory : triggerHistories) {
List<TriggerRouter> triggerRouters = triggerRoutersByHistoryId.get(triggerHistory
.getTriggerHistoryId());
if (triggerRouters != null && triggerRouters.size() > 0) {

boolean hasTriggerRouters = triggerRouters != null && triggerRouters.size() > 0;

if (!hasTriggerRouters && triggerHistory.getSourceTableName().startsWith(parameterService.getTablePrefix())) {
if (overrideChannelId == null) {
overrideChannelId = Constants.CHANNEL_RELOAD;
}
hasTriggerRouters = triggerRouters.add(new TriggerRouter());
}

if (hasTriggerRouters) {
for (TriggerRouter triggerRouter : triggerRouters) {
eventCount++;
String channelId = overrideChannelId;
Expand Down
Expand Up @@ -165,6 +165,14 @@ protected List<NodeCommunication> find(CommunicationType communicationType) {
}

public List<NodeCommunication> list(CommunicationType communicationType) {
return list(communicationType, true);
}

public List<NodeCommunication> listAll(CommunicationType communicationType) {
return list(communicationType, false);
}

protected List<NodeCommunication> list(CommunicationType communicationType, boolean onlyNodesWithChanges) {
initialize();
long ts = System.currentTimeMillis();
List<NodeCommunication> communicationRows = find(communicationType);
Expand Down Expand Up @@ -230,7 +238,7 @@ public List<NodeCommunication> list(CommunicationType communicationType) {
}
}

if (communicationType == CommunicationType.PUSH &&
if (communicationType == CommunicationType.PUSH && onlyNodesWithChanges &&
parameterService.getInt(ParameterConstants.PUSH_THREAD_COUNT_PER_SERVER) < communicationRows.size()) {
ts = System.currentTimeMillis();
List<String> nodeIds = getNodeIdsWithUnsentCount();
Expand Down Expand Up @@ -533,6 +541,11 @@ public void run() {
boolean failed = false;
try {
MDC.put("engineName", parameterService.getEngineName());
String name = parameterService.getEngineName().toLowerCase() + "-" + nodeCommunication.getCommunicationType().name().toLowerCase() +
"-" + nodeCommunication.getQueue().toLowerCase();
Thread thread = Thread.currentThread();
thread.setName(thread.getName().replaceFirst(".*(-\\d+)", name + "$1"));

executor.execute(nodeCommunication, status);
failed = status.failed();
} catch (Throwable ex) {
Expand All @@ -551,7 +564,6 @@ public void run() {
} else {
ThreadPoolExecutor service = getExecutor(nodeCommunication.getCommunicationType(),
nodeCommunication.getQueue());
((ChannelThreadFactory) service.getThreadFactory()).setChannelThread(nodeCommunication.getQueue());
service.execute(r);
}
}
Expand Down Expand Up @@ -666,24 +678,14 @@ static class ChannelThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private String engineName;
private String communicationType;
private String channelThread;

public ChannelThreadFactory(String engineName, String communicationType) {
this.engineName = engineName;
this.communicationType = communicationType;
}

String getChannelThread() {
return channelThread != null ? this.channelThread : "default";
}

void setChannelThread(String channelThread) {
this.channelThread = channelThread;
}

public String getThreadPrefix() {
return new StringBuffer(engineName.toLowerCase()).append("-").append(communicationType.toLowerCase()).append("-")
.append(getChannelThread()).append("-").toString();
return new StringBuffer(engineName.toLowerCase()).append("-").append(communicationType.toLowerCase()).append("-default-").toString();
}

public Thread newThread(Runnable r) {
Expand Down

0 comments on commit 0f87d7c

Please sign in to comment.