diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ConfigurationChangedHelper.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ConfigurationChangedHelper.java index 04a93bfd91..aafabfaaf5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ConfigurationChangedHelper.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ConfigurationChangedHelper.java @@ -32,6 +32,7 @@ import org.jumpmind.symmetric.io.stage.IStagedResource; import org.jumpmind.symmetric.job.IJobManager; import org.jumpmind.symmetric.model.JobDefinition; +import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.Trigger; import org.jumpmind.symmetric.service.ClusterConstants; @@ -64,10 +65,12 @@ public class ConfigurationChangedHelper { private static final String CTX_KEY_CLUSTER_NEEDED = "ClusterEnable." + SUFFIX; private ISymmetricEngine engine; private String tablePrefix; + private ConfigurationVersionHelper versionHelper; public ConfigurationChangedHelper(ISymmetricEngine engine) { this.engine = engine; tablePrefix = engine.getTablePrefix(); + versionHelper = new ConfigurationVersionHelper(tablePrefix); } public void handleChange(Context context, Table table, CsvData data) { @@ -318,4 +321,8 @@ public boolean isSyncTriggersAllowed(Context context) { public void setSyncTriggersNeeded(Context context) { context.put(CTX_KEY_RESYNC_NEEDED, true); } + + public Set filterNodes(Set nodes, String tableName) { + return versionHelper.filterNodes(nodes, tableName); + } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ConfigurationVersionHelper.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ConfigurationVersionHelper.java new file mode 100644 index 0000000000..2603d09df6 --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ConfigurationVersionHelper.java @@ -0,0 +1,79 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.common; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.jumpmind.symmetric.Version; +import org.jumpmind.symmetric.model.Node; + +public class ConfigurationVersionHelper { + protected Set proTables; + protected Map tablesByVersion; + protected boolean isTargetNodePro; + protected String targetNodeVersion; + + public ConfigurationVersionHelper(String tablePrefix) { + proTables = TableConstants.getTablesForConsole(tablePrefix); + tablesByVersion = TableConstants.getConfigTablesByVersion(tablePrefix); + } + + public ConfigurationVersionHelper(String tablePrefix, Node targetNode) { + this(tablePrefix); + setTargetNode(targetNode); + } + + public boolean shouldSendTable(String tableName) { + if (!isTargetNodePro && proTables.contains(tableName)) { + return false; + } + String tableVersion = tablesByVersion.get(tableName); + if (tableVersion != null && Version.isOlderThanVersion(targetNodeVersion, tableVersion)) { + return false; + } + return true; + } + + public Set filterNodes(Set nodes, String tableName) { + boolean isProTable = proTables.contains(tableName); + String tableVersion = tablesByVersion.get(tableName); + if (isProTable || tableVersion != null) { + Set targetNodes = new HashSet(nodes.size()); + for (Node node : nodes) { + setTargetNode(node); + if ((!isProTable || isTargetNodePro) && (tableVersion == null || !Version.isOlderThanVersion(targetNodeVersion, tableVersion))) { + targetNodes.add(node); + } + } + return targetNodes; + } else { + return nodes; + } + } + + public void setTargetNode(Node targetNode) { + targetNodeVersion = targetNode.getSymmetricVersion(); + isTargetNodePro = StringUtils.equals(targetNode.getDeploymentType(), Constants.DEPLOYMENT_TYPE_PROFESSIONAL); + } +} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java index 9cdb0dca66..43ec192f68 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java @@ -22,8 +22,10 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.lang3.StringUtils; @@ -81,7 +83,6 @@ public class TableConstants { public static final String SYM_CONSOLE_TABLE_STATS = "console_table_stats"; public static final String SYM_DESIGN_DIAGRAM = "design_diagram"; public static final String SYM_DIAGRAM_GROUP = "diagram_group"; - public static final String SYM_DIAGRAM_GROUP_LINK = "diagram_group_link"; public static final String SYM_EXTENSION = "extension"; public static final String SYM_MONITOR = "monitor"; public static final String SYM_MONITOR_EVENT = "monitor_event"; @@ -104,12 +105,21 @@ public static final Set getTables(String tablePrefix) { SYM_REGISTRATION_REDIRECT, SYM_REGISTRATION_REQUEST, SYM_ROUTER, SYM_SEQUENCE, SYM_TABLE_RELOAD_REQUEST, SYM_TABLE_RELOAD_STATUS, SYM_TRANSFORM_TABLE, SYM_TRANSFORM_COLUMN, SYM_TRIGGER, SYM_TRIGGER_HIST, SYM_TRIGGER_ROUTER, SYM_TRIGGER_ROUTER_GROUPLET); if (hasConsoleSchema) { - addPrefixToTableNames(tables, tablePrefix, SYM_CONSOLE_EVENT, SYM_CONSOLE_USER, SYM_CONSOLE_USER_HIST, SYM_CONSOLE_ROLE, - SYM_CONSOLE_ROLE_PRIVILEGE, SYM_CONSOLE_TABLE_STATS, SYM_DESIGN_DIAGRAM, SYM_DIAGRAM_GROUP); + tables.addAll(getTablesForConsole(tablePrefix)); } return tables; } + /** + * Set of all SymmetricDS configuration and runtime tables used in professional console. + */ + public static final Set getTablesForConsole(String tablePrefix) { + Set tables = new HashSet(); + addPrefixToTableNames(tables, tablePrefix, SYM_CONSOLE_EVENT, SYM_CONSOLE_USER, SYM_CONSOLE_USER_HIST, SYM_CONSOLE_ROLE, + SYM_CONSOLE_ROLE_PRIVILEGE, SYM_CONSOLE_TABLE_STATS, SYM_DESIGN_DIAGRAM, SYM_DIAGRAM_GROUP); + return tables; + } + public static final Set getTablesWithoutPrefix() { return getTables(""); } @@ -132,6 +142,23 @@ public static final List getConfigTables(String tablePrefix) { return tables; } + /** + * Map with key of each configuration table and value of the SymmetricDS version when they were introduced. + */ + public static final Map getConfigTablesByVersion(String tablePrefix) { + Map map = new HashMap(); + addPrefixToTableNames(map, tablePrefix, "3.3.0", SYM_GROUPLET, SYM_GROUPLET_LINK, SYM_TRIGGER_ROUTER_GROUPLET); + addPrefixToTableNames(map, tablePrefix, "3.5.0", SYM_FILE_TRIGGER, SYM_FILE_TRIGGER_ROUTER, SYM_FILE_SNAPSHOT, SYM_EXTRACT_REQUEST, + SYM_NODE_GROUP_CHANNEL_WND); + addPrefixToTableNames(map, tablePrefix, "3.7.0", SYM_EXTENSION); + addPrefixToTableNames(map, tablePrefix, "3.8.0", SYM_NOTIFICATION, SYM_MONITOR, SYM_MONITOR_EVENT, SYM_CONSOLE_EVENT); + addPrefixToTableNames(map, tablePrefix, "3.8.18", SYM_CONSOLE_USER_HIST); + addPrefixToTableNames(map, tablePrefix, "3.9.0", SYM_JOB); + addPrefixToTableNames(map, tablePrefix, "3.10.0", SYM_TABLE_RELOAD_STATUS); + addPrefixToTableNames(map, tablePrefix, "3.12.0", SYM_CONSOLE_ROLE, SYM_CONSOLE_ROLE_PRIVILEGE, SYM_DESIGN_DIAGRAM, SYM_DIAGRAM_GROUP); + return map; + } + /** * Which tables from getConfigTables() should not be sent during registration. These tables will still have a trigger installed for capturing and sending * changes. @@ -154,8 +181,8 @@ public static final Set getConfigTablesWithoutCapture(String tablePrefix * Which tables from getConfigTables() should be excluded from a configuration export. */ public static final String[] getConfigTablesExcludedFromExport() { - return new String[] { SYM_NODE, SYM_NODE_SECURITY, SYM_NODE_IDENTITY, SYM_NODE_HOST, SYM_NODE_CHANNEL_CTL, SYM_FILE_SNAPSHOT, SYM_CONSOLE_USER, - SYM_CONSOLE_ROLE, SYM_CONSOLE_ROLE_PRIVILEGE, SYM_CONSOLE_USER_HIST, SYM_TABLE_RELOAD_REQUEST, SYM_CONSOLE_EVENT, SYM_MONITOR_EVENT }; + return new String[] { SYM_NODE, SYM_NODE_SECURITY, SYM_NODE_IDENTITY, SYM_NODE_HOST, SYM_FILE_SNAPSHOT, SYM_CONSOLE_USER, + SYM_CONSOLE_ROLE, SYM_CONSOLE_ROLE_PRIVILEGE, SYM_CONSOLE_USER_HIST, SYM_TABLE_RELOAD_REQUEST, SYM_MONITOR_EVENT }; } /** @@ -169,6 +196,12 @@ public static final List getConfigTablesForExport(String tablePrefix) { return tables; } + protected static final void addPrefixToTableNames(Map map, String tablePrefix, String version, String... names) { + for (String name : names) { + map.put(getTableName(tablePrefix, name), version); + } + } + protected static final void addPrefixToTableNames(Collection collection, String tablePrefix, String... names) { for (String name : names) { collection.add(getTableName(tablePrefix, name)); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java index 63b1c2e835..afbfb202c9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Set; -import org.apache.commons.lang3.StringUtils; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.extension.IBuiltInExtensionPoint; import org.jumpmind.symmetric.ISymmetricEngine; @@ -67,8 +66,7 @@ public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData engine.getParameterService().is(ParameterConstants.AUTO_SYNC_TRIGGERS_AFTER_CONFIG_CHANGED)); } helper.handleChange(routingContext, dataMetaData.getTable(), dataMetaData.getData()); - possibleTargetNodes = filterOutOlderNodes(dataMetaData, possibleTargetNodes); - possibleTargetNodes = filterOutNodesByDeploymentType(dataMetaData, possibleTargetNodes); + possibleTargetNodes = helper.filterNodes(possibleTargetNodes, tableName(dataMetaData.getTable().getNameLowerCase())); // the list of nodeIds that we will return Set nodeIds = new HashSet(); // the inbound data @@ -237,59 +235,6 @@ private TriggerRouter findTriggerRouter(List triggerRouters, Stri return null; } - protected Set filterOutNodesByDeploymentType(DataMetaData dataMetaData, Set possibleTargetNodes) { - if (tableMatches(dataMetaData, TableConstants.SYM_CONSOLE_USER) - || tableMatches(dataMetaData, TableConstants.SYM_CONSOLE_USER_HIST) - || tableMatches(dataMetaData, TableConstants.SYM_CONSOLE_ROLE) - || tableMatches(dataMetaData, TableConstants.SYM_CONSOLE_ROLE_PRIVILEGE) - || tableMatches(dataMetaData, TableConstants.SYM_DESIGN_DIAGRAM) - || tableMatches(dataMetaData, TableConstants.SYM_DIAGRAM_GROUP)) { - Set targetNodes = new HashSet(possibleTargetNodes.size()); - for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) { - boolean isTargetProfessional = StringUtils.equals(nodeThatMayBeRoutedTo.getDeploymentType(), - Constants.DEPLOYMENT_TYPE_PROFESSIONAL); - if (isTargetProfessional) { - targetNodes.add(nodeThatMayBeRoutedTo); - } - } - return targetNodes; - } else { - return possibleTargetNodes; - } - } - - protected Set filterOutOlderNodes(DataMetaData dataMetaData, Set possibleTargetNodes) { - if (tableMatches(dataMetaData, TableConstants.SYM_MONITOR) - || tableMatches(dataMetaData, TableConstants.SYM_MONITOR_EVENT) - || tableMatches(dataMetaData, TableConstants.SYM_NOTIFICATION) - || tableMatches(dataMetaData, TableConstants.SYM_JOB) - || tableMatches(dataMetaData, TableConstants.SYM_CONSOLE_ROLE) - || tableMatches(dataMetaData, TableConstants.SYM_CONSOLE_ROLE_PRIVILEGE) - || tableMatches(dataMetaData, TableConstants.SYM_DIAGRAM_GROUP) - || tableMatches(dataMetaData, TableConstants.SYM_DESIGN_DIAGRAM)) { - Set targetNodes = new HashSet(possibleTargetNodes.size()); - for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) { - if (tableMatches(dataMetaData, TableConstants.SYM_JOB)) { - if (nodeThatMayBeRoutedTo.isVersionGreaterThanOrEqualTo(3, 9, 0)) { - targetNodes.add(nodeThatMayBeRoutedTo); - } - } else if (tableMatches(dataMetaData, TableConstants.SYM_CONSOLE_ROLE) - || tableMatches(dataMetaData, TableConstants.SYM_CONSOLE_ROLE_PRIVILEGE) - || tableMatches(dataMetaData, TableConstants.SYM_DIAGRAM_GROUP) - || tableMatches(dataMetaData, TableConstants.SYM_DESIGN_DIAGRAM)) { - if (nodeThatMayBeRoutedTo.isVersionGreaterThanOrEqualTo(3, 12, 0)) { - targetNodes.add(nodeThatMayBeRoutedTo); - } - } else if (nodeThatMayBeRoutedTo.isVersionGreaterThanOrEqualTo(3, 8, 0)) { - targetNodes.add(nodeThatMayBeRoutedTo); - } - } - return targetNodes; - } else { - return possibleTargetNodes; - } - } - protected void routeNodeTables(Set nodeIds, Map columnValues, NetworkedNode rootNetworkedNode, Node me, SimpleRouterContext routingContext, DataMetaData dataMetaData, Set possibleTargetNodes, boolean initialLoad) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 629969d552..5b1b4c5d01 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -75,6 +75,7 @@ import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.SymmetricException; import org.jumpmind.symmetric.Version; +import org.jumpmind.symmetric.common.ConfigurationVersionHelper; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ErrorConstants; import org.jumpmind.symmetric.common.ParameterConstants; @@ -210,155 +211,79 @@ public DataExtractorService(ISymmetricEngine engine) { createSqlReplacementTokens())); } - protected boolean filter(Node targetNode, String tableName) { - boolean pre37 = Version.isOlderThanVersion(targetNode.getSymmetricVersionParts(), Version.VERSION_3_7_0); - boolean pre38 = Version.isOlderThanVersion(targetNode.getSymmetricVersionParts(), Version.VERSION_3_8_0); - boolean pre3818 = Version.isOlderThanVersion(targetNode.getSymmetricVersionParts(), Version.VERSION_3_8_18); - boolean pre39 = Version.isOlderThanVersion(targetNode.getSymmetricVersionParts(), Version.VERSION_3_9_0); - boolean pre311 = Version.isOlderThanVersion(targetNode.getSymmetricVersionParts(), Version.VERSION_3_11_0); - boolean pre312 = Version.isOlderThanVersion(targetNode.getSymmetricVersionParts(), Version.VERSION_3_12_0); - tableName = tableName.toLowerCase(); - boolean include = true; - if (pre39 && (tableName.contains(TableConstants.SYM_JOB) || tableName.contains(TableConstants.SYM_CONSOLE_ROLE) || tableName.contains( - TableConstants.SYM_CONSOLE_ROLE_PRIVILEGE))) { - include = false; - } else if (pre37 && tableName.contains(TableConstants.SYM_EXTENSION)) { - include = false; - } else if (pre38 && (tableName.contains(TableConstants.SYM_MONITOR) || - tableName.contains(TableConstants.SYM_NOTIFICATION))) { - include = false; - } else if (pre3818 && tableName.contains(TableConstants.SYM_CONSOLE_USER_HIST)) { - include = false; - } else if (pre311 && (tableName.contains(TableConstants.SYM_CONSOLE_ROLE) - || tableName.contains(TableConstants.SYM_CONSOLE_ROLE_PRIVILEGE))) { - include = false; - } else if (pre312 && (tableName.contains(TableConstants.SYM_DESIGN_DIAGRAM) - || tableName.contains(TableConstants.SYM_DIAGRAM_GROUP))) { - include = false; - } else if (tableName.contains(TableConstants.SYM_CONSOLE_USER) - || tableName.contains(TableConstants.SYM_CONSOLE_USER_HIST) - || tableName.contains(TableConstants.SYM_CONSOLE_EVENT) - || tableName.contains(TableConstants.SYM_CONSOLE_ROLE) - || tableName.contains(TableConstants.SYM_CONSOLE_ROLE_PRIVILEGE) - || tableName.contains(TableConstants.SYM_DESIGN_DIAGRAM) - || tableName.contains(TableConstants.SYM_DIAGRAM_GROUP)) { - boolean isTargetProfessional = StringUtils.equals(targetNode.getDeploymentType(), - Constants.DEPLOYMENT_TYPE_PROFESSIONAL); - if (!isTargetProfessional) { - include = false; - } - } - return include; - } - /** * Extract the SymmetricDS configuration for the passed in {@link Node}. */ - public void extractConfigurationStandalone(Node targetNode, Writer writer, - String... tablesToExclude) { + public void extractConfigurationStandalone(Node targetNode, Writer writer, String... tablesToExclude) { Node sourceNode = nodeService.findIdentity(); if (targetNode != null && sourceNode != null) { - Batch batch = new Batch(BatchType.EXTRACT, Constants.VIRTUAL_BATCH_FOR_REGISTRATION, - Constants.CHANNEL_CONFIG, symmetricDialect.getBinaryEncoding(), + Batch batch = new Batch(BatchType.EXTRACT, Constants.VIRTUAL_BATCH_FOR_REGISTRATION, Constants.CHANNEL_CONFIG, symmetricDialect.getBinaryEncoding(), sourceNode.getNodeId(), targetNode.getNodeId(), false); - NodeGroupLink nodeGroupLink = new NodeGroupLink(parameterService.getNodeGroupId(), - targetNode.getNodeGroupId()); - List triggerRouters = triggerRouterService - .buildTriggerRoutersForSymmetricTables( - StringUtils.isBlank(targetNode.getSymmetricVersion()) ? Version - .version() : targetNode.getSymmetricVersion(), nodeGroupLink, - tablesToExclude); - List initialLoadEvents = new ArrayList( - triggerRouters.size() * 2); - for (int i = triggerRouters.size() - 1; i >= 0; i--) { - TriggerRouter triggerRouter = triggerRouters.get(i); - String channelId = triggerRouter.getTrigger().getChannelId(); - if (Constants.CHANNEL_CONFIG.equals(channelId) - || Constants.CHANNEL_HEARTBEAT.equals(channelId)) { - if (filter(targetNode, triggerRouter.getTrigger().getSourceTableName())) { - TriggerHistory triggerHistory = triggerRouterService - .getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger() - .getTriggerId(), null, null, triggerRouter.getTrigger() - .getSourceTableName()); - if (triggerHistory == null) { - Trigger trigger = triggerRouter.getTrigger(); - Table table = symmetricDialect.getPlatform().getTableFromCache( - trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), - trigger.getSourceTableName(), false); - if (table == null) { - throw new IllegalStateException("Could not find a required table: " - + triggerRouter.getTrigger().getSourceTableName()); - } - triggerHistory = new TriggerHistory(table, triggerRouter.getTrigger(), - symmetricDialect.getTriggerTemplate()); - triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i); + NodeGroupLink nodeGroupLink = new NodeGroupLink(parameterService.getNodeGroupId(), targetNode.getNodeGroupId()); + List configTriggerRouters = triggerRouterService.buildTriggerRoutersForSymmetricTables(StringUtils.isBlank(targetNode + .getSymmetricVersion()) ? Version.version() : targetNode.getSymmetricVersion(), nodeGroupLink, tablesToExclude); + List initialLoadEvents = new ArrayList(configTriggerRouters.size() * 2); + ConfigurationVersionHelper helper = new ConfigurationVersionHelper(symmetricDialect.getTablePrefix(), targetNode); + List triggerRouters = new ArrayList(); + List triggerHistories = new ArrayList(); + for (int i = 0; i < configTriggerRouters.size(); i++) { + TriggerRouter triggerRouter = configTriggerRouters.get(i); + Trigger trigger = triggerRouter.getTrigger(); + String channelId = trigger.getChannelId(); + String tableName = trigger.getSourceTableName(); + if ((Constants.CHANNEL_CONFIG.equals(channelId) || Constants.CHANNEL_HEARTBEAT.equals(channelId)) && helper.shouldSendTable(tableName)) { + TriggerHistory triggerHistory = triggerRouterService.getNewestTriggerHistoryForTrigger(trigger.getTriggerId(), null, null, tableName); + if (triggerHistory == null) { + Table table = platform.getTableFromCache(trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), tableName, false); + if (table == null) { + throw new IllegalStateException("Could not find a required table: " + tableName); } - StringBuilder sql = new StringBuilder(symmetricDialect.createPurgeSqlFor( - targetNode, triggerRouter, triggerHistory)); - addPurgeCriteriaToConfigurationTables(triggerRouter.getTrigger() - .getSourceTableName(), sql); - String sourceTable = triggerHistory.getSourceTableName(); - Data data = new Data(1, null, sql.toString(), DataEventType.SQL, - sourceTable, null, triggerHistory, triggerRouter.getTrigger() - .getChannelId(), null, null); - initialLoadEvents.add(new SelectFromTableEvent(data, triggerRouter)); + triggerHistory = new TriggerHistory(table, trigger, symmetricDialect.getTriggerTemplate()); + triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i); } + triggerRouters.add(triggerRouter); + triggerHistories.add(triggerHistory); } } + for (int i = triggerRouters.size() - 1; i >= 0; i--) { + TriggerRouter triggerRouter = triggerRouters.get(i); + TriggerHistory triggerHistory = triggerHistories.get(i); + StringBuilder sql = new StringBuilder(symmetricDialect.createPurgeSqlFor(targetNode, triggerRouter, triggerHistory)); + addPurgeCriteriaToConfigurationTables(triggerRouter.getTrigger().getSourceTableName(), sql); + Data data = new Data(1, null, sql.toString(), DataEventType.SQL, triggerHistory.getSourceTableName(), null, triggerHistory, triggerRouter + .getTrigger().getChannelId(), null, null); + initialLoadEvents.add(new SelectFromTableEvent(data, triggerRouter)); + } for (int i = 0; i < triggerRouters.size(); i++) { TriggerRouter triggerRouter = triggerRouters.get(i); - String channelId = triggerRouter.getTrigger().getChannelId(); - if (Constants.CHANNEL_CONFIG.equals(channelId) - || Constants.CHANNEL_HEARTBEAT.equals(channelId)) { - if (filter(targetNode, triggerRouter.getTrigger().getSourceTableName())) { - TriggerHistory triggerHistory = triggerRouterService - .getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger() - .getTriggerId(), null, null, null); - if (triggerHistory == null) { - Trigger trigger = triggerRouter.getTrigger(); - triggerHistory = new TriggerHistory(symmetricDialect.getPlatform() - .getTableFromCache(trigger.getSourceCatalogName(), - trigger.getSourceSchemaName(), - trigger.getSourceTableName(), false), trigger, - symmetricDialect.getTriggerTemplate()); - triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i); - } - Table table = symmetricDialect.getPlatform().getTableFromCache( - triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(), - triggerHistory.getSourceTableName(), false); - String initialLoadSql = "1=1 order by "; - String quote = symmetricDialect.getPlatform().getDdlBuilder().getDatabaseInfo().getDelimiterToken(); - Column[] pkColumns = table.getPrimaryKeyColumns(); - for (int j = 0; j < pkColumns.length; j++) { - if (j > 0) { - initialLoadSql += ", "; - } - initialLoadSql += quote + pkColumns[j].getName() + quote; - } - if (!triggerRouter.getTrigger().getSourceTableName() - .endsWith(TableConstants.SYM_NODE_IDENTITY)) { - initialLoadEvents.add(new SelectFromTableEvent(targetNode, - triggerRouter, triggerHistory, initialLoadSql)); - } else { - Data data = new Data(1, null, targetNode.getNodeId(), - DataEventType.INSERT, triggerHistory.getSourceTableName(), - null, triggerHistory, - triggerRouter.getTrigger().getChannelId(), null, null); - initialLoadEvents.add(new SelectFromTableEvent(data, triggerRouter)); - } + TriggerHistory triggerHistory = triggerHistories.get(i); + Table table = symmetricDialect.getPlatform().getTableFromCache(triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(), + triggerHistory.getSourceTableName(), false); + String initialLoadSql = "1=1 order by "; + String quote = platform.getDdlBuilder().getDatabaseInfo().getDelimiterToken(); + Column[] pkColumns = table.getPrimaryKeyColumns(); + for (int j = 0; j < pkColumns.length; j++) { + if (j > 0) { + initialLoadSql += ", "; } + initialLoadSql += quote + pkColumns[j].getName() + quote; + } + if (!triggerRouter.getTrigger().getSourceTableName().endsWith(TableConstants.SYM_NODE_IDENTITY)) { + initialLoadEvents.add(new SelectFromTableEvent(targetNode, triggerRouter, triggerHistory, initialLoadSql)); + } else { + Data data = new Data(1, null, targetNode.getNodeId(), DataEventType.INSERT, triggerHistory.getSourceTableName(), null, triggerHistory, + triggerRouter.getTrigger().getChannelId(), null, null); + initialLoadEvents.add(new SelectFromTableEvent(data, triggerRouter)); } } SelectFromTableSource source = new SelectFromTableSource(engine, batch, initialLoadEvents); source.setConfiguration(true); - ExtractDataReader dataReader = new ExtractDataReader( - this.symmetricDialect.getPlatform(), source); - ProtocolDataWriter dataWriter = new ProtocolDataWriter( - nodeService.findIdentityNodeId(), writer, targetNode.requires13Compatiblity(), false, false); + ExtractDataReader dataReader = new ExtractDataReader(symmetricDialect.getPlatform(), source); + ProtocolDataWriter dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(), writer, targetNode.requires13Compatiblity(), false, false); List transformsList = transformService.getConfigExtractTransforms(nodeGroupLink); TransformTable[] transforms = transformsList.toArray(new TransformTable[transformsList.size()]); - TransformWriter transformWriter = new TransformWriter(symmetricDialect.getTargetPlatform(), TransformPoint.EXTRACT, dataWriter, - transformService.getColumnTransforms(), transforms); + TransformWriter transformWriter = new TransformWriter(symmetricDialect.getTargetPlatform(), TransformPoint.EXTRACT, dataWriter, transformService + .getColumnTransforms(), transforms); DataContext ctx = new DataContext(); DataProcessor processor = new DataProcessor(dataReader, transformWriter, "configuration extract"); ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode); @@ -366,8 +291,7 @@ public void extractConfigurationStandalone(Node targetNode, Writer writer, ctx.put(Constants.DATA_CONTEXT_ENGINE, engine); processor.process(ctx); if (triggerRouters.size() == 0) { - log.error("{} attempted registration, but was sent an empty configuration", - targetNode); + log.error("{} attempted registration, but was sent an empty configuration", targetNode); } } } diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/VersionUnitTest.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/VersionUnitTest.java index 1d47aeaa9f..b9cc3ca722 100644 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/VersionUnitTest.java +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/VersionUnitTest.java @@ -32,6 +32,10 @@ public void testIsOlderThanVersion() { assertFalse(Version.isOlderThanVersion("1.6.0", "1.6.0")); assertFalse(Version.isOlderThanVersion("1.6.1", "1.6.0")); assertFalse(Version.isOlderThanVersion("2.0.0", "1.6.0")); + assertFalse(Version.isOlderThanVersion("SNAPSHOT", "1.6.0")); + assertTrue(Version.isOlderThanVersion("2.0.0", "SNAPSHOT")); + assertFalse(Version.isOlderThanVersion("development", "1.6.0")); + assertTrue(Version.isOlderThanVersion("2.0.0", "development")); } @Test diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/common/ConfigurationVersionHelperTest.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/common/ConfigurationVersionHelperTest.java new file mode 100644 index 0000000000..9fbee668d3 --- /dev/null +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/common/ConfigurationVersionHelperTest.java @@ -0,0 +1,183 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.common; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.jumpmind.symmetric.Version; +import org.jumpmind.symmetric.model.Node; +import org.junit.jupiter.api.Test; + +public class ConfigurationVersionHelperTest { + protected final String PREFIX = "sym"; + + @Test + public void testFilterNodesOss() { + testFilterNodes(false); + } + + @Test + public void testFilterNodesPro() { + testFilterNodes(true); + } + + public void testFilterNodes(boolean isPro) { + ConfigurationVersionHelper helper = new ConfigurationVersionHelper(PREFIX); + Map> versionWithTables = getVersionWithTables(); + List versions = new ArrayList(versionWithTables.keySet()); + versions.sort(new Comparator() { + @Override + public int compare(String s1, String s2) { + return s1.equals(s2) ? 0 : Version.isOlderThanVersion(s1, s2) ? -1 : 1; + } + }); + Set nodes = new HashSet(); + for (String version : versions) { + Node node = new Node(version, version); + node.setSymmetricVersion(version); + node.setDeploymentType(isPro ? Constants.DEPLOYMENT_TYPE_PROFESSIONAL : Constants.DEPLOYMENT_TYPE_SERVER); + nodes.add(node); + } + List shouldSendTables = TableConstants.getConfigTables(PREFIX); + List shouldNotSendTables = new ArrayList(); + for (String table : TableConstants.getConfigTablesByVersion(PREFIX).keySet()) { + shouldSendTables.remove(table); + shouldNotSendTables.add(table); + } + Set proTables = TableConstants.getTablesForConsole(PREFIX); + for (String version : versions) { + List newTables = versionWithTables.get(version); + if (newTables != null) { + for (String table : newTables) { + if (isPro || !proTables.contains(table)) { + shouldSendTables.add(table); + shouldNotSendTables.remove(table); + } + } + } + for (String table : shouldSendTables) { + Set filteredNodes = helper.filterNodes(nodes, table); + assertTrue("Should send table " + table + " to node " + version, filteredNodes.contains(new Node(version, version))); + } + for (String table : shouldNotSendTables) { + Set filteredNodes = helper.filterNodes(nodes, table); + assertFalse("Should NOT send table " + table + " to node " + version, filteredNodes.contains(new Node(version, version))); + } + } + } + + @Test + public void testShouldSendTableOss() { + testShouldSendTable(false); + } + + @Test + public void testShouldSendTablePro() { + testShouldSendTable(true); + } + + public void testShouldSendTable(boolean isPro) { + Map> versionWithTables = getVersionWithTables(); + List shouldSendTables = TableConstants.getConfigTables(PREFIX); + List shouldNotSendTables = new ArrayList(); + for (String table : TableConstants.getConfigTablesByVersion(PREFIX).keySet()) { + shouldSendTables.remove(table); + shouldNotSendTables.add(table); + } + String deployment = isPro ? Constants.DEPLOYMENT_TYPE_PROFESSIONAL : Constants.DEPLOYMENT_TYPE_SERVER; + Set proTables = TableConstants.getTablesForConsole(PREFIX); + for (String version : getVersions()) { + List newTables = versionWithTables.get(version); + if (newTables != null) { + for (String table : newTables) { + if (isPro || !proTables.contains(table)) { + shouldSendTables.add(table); + shouldNotSendTables.remove(table); + } + } + } + shouldSend(version, shouldSendTables, deployment); + shouldNotSend(version, shouldNotSendTables, deployment); + } + shouldSend("development", shouldSendTables, deployment); + shouldSend("SNAPSHOT", shouldSendTables, deployment); + } + + protected List getVersions() { + List versions = new ArrayList(); + for (int i = 2; i < 5; i++) { + for (int j = 0; j < 20; j++) { + for (int k = 0; k < 20; k++) { + versions.add(i + "." + j + "." + k); + } + } + } + return versions; + } + + protected Map> getVersionWithTables() { + Map> versionWithTables = new HashMap>(); + for (Map.Entry entry : TableConstants.getConfigTablesByVersion(PREFIX).entrySet()) { + String table = entry.getKey(); + String version = entry.getValue(); + List tables = versionWithTables.get(version); + if (tables == null) { + tables = new ArrayList(); + } + tables.add(table); + versionWithTables.put(version, tables); + } + return versionWithTables; + } + + protected void shouldSend(String version, List tables, String deployment) { + checkSend(version, tables, true, deployment); + } + + protected void shouldNotSend(String version, List tables, String deployment) { + checkSend(version, tables, false, deployment); + } + + protected void checkSend(String version, List tables, boolean shouldSend, String deployment) { + ConfigurationVersionHelper helper = new ConfigurationVersionHelper(PREFIX); + Node node = new Node(); + node.setSymmetricVersion(version); + node.setDeploymentType(deployment); + helper.setTargetNode(node); + for (String table : tables) { + String message = "Version " + version + " node " + (shouldSend ? "SHOULD" : "should NOT") + " be sent " + table + " table"; + if (shouldSend) { + assertTrue(message, helper.shouldSendTable(table)); + } else { + assertFalse(message, helper.shouldSendTable(table)); + } + } + } +} diff --git a/symmetric-util/src/main/java/org/jumpmind/util/AbstractVersion.java b/symmetric-util/src/main/java/org/jumpmind/util/AbstractVersion.java index 569e494507..dedd0dd7f0 100644 --- a/symmetric-util/src/main/java/org/jumpmind/util/AbstractVersion.java +++ b/symmetric-util/src/main/java/org/jumpmind/util/AbstractVersion.java @@ -158,7 +158,9 @@ public boolean isOlderVersion(String version) { } public boolean isOlderThanVersion(String checkVersion, String targetVersion) { - if (noVersion(targetVersion) || noVersion(checkVersion)) { + if (noVersion(targetVersion)) { + return true; + } else if (noVersion(checkVersion)) { return false; } int[] checkVersions = parseVersion(checkVersion); @@ -184,7 +186,7 @@ public boolean isOlderThanVersion(int[] checkVersion, int[] targetVersion) { } protected boolean noVersion(String targetVersion) { - return StringUtils.isBlank(targetVersion) || "development".equals(targetVersion); + return StringUtils.isBlank(targetVersion) || "development".equals(targetVersion) || targetVersion.contains("SNAPSHOT"); } public boolean isDevelopment(String version) {