Skip to content

Commit

Permalink
Merge branch '3.8' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.8
  • Loading branch information
jumpmind-josh committed Jun 13, 2017
2 parents c7d814e + 570f237 commit 1d51750
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 3 deletions.
Expand Up @@ -105,6 +105,8 @@ private Constants() {
public static final String CHANNEL_FILESYNC_RELOAD = "filesync_reload";

public static final String CHANNEL_DYNAMIC = "dynamic";

public static final String CHANNEL_MONITOR = "monitor";

public static final String PUSH_JOB_TIMER = "job.push";

Expand Down
Expand Up @@ -95,6 +95,9 @@ public class ConfigurationChangedDataRouter extends AbstractDataRouter implement

final String CTX_KEY_FLUSHED_TRIGGER_ROUTERS = "FlushedTriggerRouters."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

final String CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED = "FlushNodeGroupLink."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

public final static String KEY = "symconfig";

Expand Down Expand Up @@ -239,6 +242,10 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
if (tableMatches(dataMetaData, TableConstants.SYM_NOTIFICATION)) {
routingContext.put(CTX_KEY_FLUSH_NOTIFICATIONS_NEEDED, Boolean.TRUE);
}

if (tableMatches(dataMetaData, TableConstants.SYM_NODE_GROUP_LINK)) {
routingContext.put(CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED, Boolean.TRUE);
}
}
}

Expand Down Expand Up @@ -633,6 +640,12 @@ public void contextCommitted(SimpleRouterContext routingContext) {
log.info("About to refresh the cache of node security because new configuration came through the data router");
engine.getNodeService().flushNodeAuthorizedCache();
}

if (routingContext.get(CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED) != null) {
log.info("About to refresh the cache of node group link because new configuration came through the data router");
engine.getConfigurationService().clearCache();
engine.getNodeService().flushNodeGroupCache();
}

}
}
Expand Down
Expand Up @@ -576,7 +576,7 @@ protected synchronized void reOpenRegistration(String nodeId, String remoteHost,
// node table, but not in node security.
// lets go ahead and try to insert into node security.
sqlTemplate.update(getSql("openRegistrationNodeSecuritySql"), new Object[] {
nodeId, password, nodeService.findNode(nodeId).getNodeId() });
nodeId, password, nodeService.findIdentityNodeId() });
log.info("Registration was opened for {}", nodeId);
} else if (updateCount == 0) {
log.warn("Registration was already enabled for {}. No need to reenable it", nodeId);
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.SqlException;
import org.jumpmind.exception.ParseException;
import org.jumpmind.symmetric.io.IoConstants;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
Expand Down Expand Up @@ -147,9 +148,27 @@ public void write(CsvData data) {
if (targetTable != null || !data.requiresTable()
|| (targetTable == null && data.getDataEventType() == DataEventType.SQL)) {
try {

statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
if (filterBefore(data)) {

switch (data.getDataEventType()) {
case UPDATE:
case INSERT:
if (sourceTable.getColumnCount() != data.getParsedData(CsvData.ROW_DATA).length) {
throw new ParseException(String.format("The (%s) table's column count (%d) does not match the data's column count (%d)", sourceTable.getName(), sourceTable.getColumnCount(), data.getParsedData(CsvData.ROW_DATA).length));
}
break;
case DELETE:
if (sourceTable.getPrimaryKeyColumnCount() != data.getParsedData(CsvData.PK_DATA).length) {
throw new ParseException(String.format("The (%s) table's pk column count (%d) does not match the data's pk column count (%d)", sourceTable.getName(), sourceTable.getPrimaryKeyColumnCount(), data.getParsedData(CsvData.PK_DATA).length));
}
break;
default:
break;
}

LoadStatus loadStatus = LoadStatus.SUCCESS;
switch (data.getDataEventType()) {
case UPDATE:
Expand Down
5 changes: 3 additions & 2 deletions symmetric-server/src/main/deploy/conf/sym_service.conf
Expand Up @@ -29,8 +29,9 @@ wrapper.java.additional.12=-Djava.net.preferIPv4Stack=true
wrapper.java.additional.13=-Dcom.sun.management.jmxremote
wrapper.java.additional.14=-Dcom.sun.management.jmxremote.authenticate=false
wrapper.java.additional.15=-Dcom.sun.management.jmxremote.port=31418
wrapper.java.additional.16=-Dcom.sun.management.jmxremote.ssl=false
wrapper.java.additional.17=-Djava.rmi.server.hostname=localhost
wrapper.java.additional.16=-Dcom.sun.management.jmxremote.rmi.port=31418
wrapper.java.additional.17=-Dcom.sun.management.jmxremote.ssl=false
wrapper.java.additional.18=-Djava.rmi.server.hostname=localhost

# Initial Java Heap Size (in MB)
wrapper.java.initmemory=256
Expand Down

0 comments on commit 1d51750

Please sign in to comment.