diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java index e7ec9a7f01..c59785f1b0 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java @@ -105,6 +105,8 @@ private Constants() { public static final String CHANNEL_FILESYNC_RELOAD = "filesync_reload"; public static final String CHANNEL_DYNAMIC = "dynamic"; + + public static final String CHANNEL_MONITOR = "monitor"; public static final String PUSH_JOB_TIMER = "job.push"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java index f8e81acd9f..57a111cfd0 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java @@ -95,6 +95,9 @@ public class ConfigurationChangedDataRouter extends AbstractDataRouter implement final String CTX_KEY_FLUSHED_TRIGGER_ROUTERS = "FlushedTriggerRouters." + ConfigurationChangedDataRouter.class.getSimpleName() + hashCode(); + + final String CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED = "FlushNodeGroupLink." + + ConfigurationChangedDataRouter.class.getSimpleName() + hashCode(); public final static String KEY = "symconfig"; @@ -239,6 +242,10 @@ public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData if (tableMatches(dataMetaData, TableConstants.SYM_NOTIFICATION)) { routingContext.put(CTX_KEY_FLUSH_NOTIFICATIONS_NEEDED, Boolean.TRUE); } + + if (tableMatches(dataMetaData, TableConstants.SYM_NODE_GROUP_LINK)) { + routingContext.put(CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED, Boolean.TRUE); + } } } @@ -633,6 +640,12 @@ public void contextCommitted(SimpleRouterContext routingContext) { log.info("About to refresh the cache of node security because new configuration came through the data router"); engine.getNodeService().flushNodeAuthorizedCache(); } + + if (routingContext.get(CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED) != null) { + log.info("About to refresh the cache of node group link because new configuration came through the data router"); + engine.getConfigurationService().clearCache(); + engine.getNodeService().flushNodeGroupCache(); + } } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java index fd73926358..670b8a852f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java @@ -576,7 +576,7 @@ protected synchronized void reOpenRegistration(String nodeId, String remoteHost, // node table, but not in node security. // lets go ahead and try to insert into node security. sqlTemplate.update(getSql("openRegistrationNodeSecuritySql"), new Object[] { - nodeId, password, nodeService.findNode(nodeId).getNodeId() }); + nodeId, password, nodeService.findIdentityNodeId() }); log.info("Registration was opened for {}", nodeId); } else if (updateCount == 0) { log.warn("Registration was already enabled for {}. No need to reenable it", nodeId); diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java index a290f54fc2..a3ff346b47 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java @@ -29,6 +29,7 @@ import org.jumpmind.db.model.Column; import org.jumpmind.db.model.Table; import org.jumpmind.db.sql.SqlException; +import org.jumpmind.exception.ParseException; import org.jumpmind.symmetric.io.IoConstants; import org.jumpmind.symmetric.io.data.Batch; import org.jumpmind.symmetric.io.data.CsvData; @@ -147,9 +148,27 @@ public void write(CsvData data) { if (targetTable != null || !data.requiresTable() || (targetTable == null && data.getDataEventType() == DataEventType.SQL)) { try { + statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT); statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); if (filterBefore(data)) { + + switch (data.getDataEventType()) { + case UPDATE: + case INSERT: + if (sourceTable.getColumnCount() != data.getParsedData(CsvData.ROW_DATA).length) { + throw new ParseException(String.format("The (%s) table's column count (%d) does not match the data's column count (%d)", sourceTable.getName(), sourceTable.getColumnCount(), data.getParsedData(CsvData.ROW_DATA).length)); + } + break; + case DELETE: + if (sourceTable.getPrimaryKeyColumnCount() != data.getParsedData(CsvData.PK_DATA).length) { + throw new ParseException(String.format("The (%s) table's pk column count (%d) does not match the data's pk column count (%d)", sourceTable.getName(), sourceTable.getPrimaryKeyColumnCount(), data.getParsedData(CsvData.PK_DATA).length)); + } + break; + default: + break; + } + LoadStatus loadStatus = LoadStatus.SUCCESS; switch (data.getDataEventType()) { case UPDATE: diff --git a/symmetric-server/src/main/deploy/conf/sym_service.conf b/symmetric-server/src/main/deploy/conf/sym_service.conf index 575665f1e5..f15ef84228 100644 --- a/symmetric-server/src/main/deploy/conf/sym_service.conf +++ b/symmetric-server/src/main/deploy/conf/sym_service.conf @@ -29,8 +29,9 @@ wrapper.java.additional.12=-Djava.net.preferIPv4Stack=true wrapper.java.additional.13=-Dcom.sun.management.jmxremote wrapper.java.additional.14=-Dcom.sun.management.jmxremote.authenticate=false wrapper.java.additional.15=-Dcom.sun.management.jmxremote.port=31418 -wrapper.java.additional.16=-Dcom.sun.management.jmxremote.ssl=false -wrapper.java.additional.17=-Djava.rmi.server.hostname=localhost +wrapper.java.additional.16=-Dcom.sun.management.jmxremote.rmi.port=31418 +wrapper.java.additional.17=-Dcom.sun.management.jmxremote.ssl=false +wrapper.java.additional.18=-Djava.rmi.server.hostname=localhost # Initial Java Heap Size (in MB) wrapper.java.initmemory=256