diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/ext/ExtensionProcessor.java b/symmetric/src/main/java/org/jumpmind/symmetric/ext/ExtensionProcessor.java index 3a635929a0..85b4d15ca2 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/ext/ExtensionProcessor.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/ext/ExtensionProcessor.java @@ -33,6 +33,8 @@ import org.jumpmind.symmetric.load.IDataLoaderFilter; import org.jumpmind.symmetric.load.IReloadListener; import org.jumpmind.symmetric.load.ITableColumnFilter; +import org.jumpmind.symmetric.route.IBatchAlgorithm; +import org.jumpmind.symmetric.route.IDataRouter; import org.jumpmind.symmetric.security.INodePasswordFilter; import org.jumpmind.symmetric.service.IAcknowledgeService; import org.jumpmind.symmetric.service.IBootstrapService; @@ -42,6 +44,7 @@ import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.symmetric.service.IRegistrationService; +import org.jumpmind.symmetric.service.IRoutingService; import org.jumpmind.symmetric.transport.IAcknowledgeEventListener; import org.jumpmind.symmetric.transport.ISyncUrlExtension; import org.jumpmind.symmetric.transport.ITransportManager; @@ -71,6 +74,8 @@ public class ExtensionProcessor implements BeanFactoryPostProcessor { IRegistrationService registrationService; ITransportManager transportManager; + + IRoutingService routingService; @SuppressWarnings("unchecked") public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { @@ -167,7 +172,13 @@ private void registerExtension(String beanName, IExtensionPoint ext) { nodeService.setNodeIdGenerator((INodeIdGenerator) ext); } - // TODO Add new routing extensions + if (ext instanceof IDataRouter) { + routingService.addDataRouter(beanName, (IDataRouter)ext); + } + + if (ext instanceof IBatchAlgorithm) { + routingService.addBatchAlgorithm(beanName, (IBatchAlgorithm)ext); + } } public void setDataLoaderService(IDataLoaderService dataLoaderService) { @@ -206,4 +217,7 @@ public void setTransportManager(ITransportManager transportManager) { this.transportManager = transportManager; } + public void setRoutingService(IRoutingService routingService) { + this.routingService = routingService; + } } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/ColumnMatchDataRouter.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/ColumnMatchDataRouter.java index f8cb7e3b9a..b44ba3983b 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/ColumnMatchDataRouter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/ColumnMatchDataRouter.java @@ -27,8 +27,39 @@ import org.apache.commons.lang.StringUtils; import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.service.IRegistrationService; +/** + * This data router is invoked when the router_name='column'. The + * router_expression is always a name value pair of a column on the table that + * is being synchronized to the value it should be matched with. + *

+ * The value can be a constant. In the data router the value of the new data is + * always represented by a string so all comparisons are done in the format that + * SymmetricDS transmits. + *

+ * The value can also be one of the following expressions: + *

    + *
  1. :NODE_ID
  2. + *
  3. :EXTERNAL_ID
  4. + *
  5. :NODE_GROUP_ID
  6. + *
  7. :REDIRECT_NODE
  8. + *
+ * NODE_ID, EXTERNAL_ID, and NODE_GROUP_ID are instructions for the column + * matcher to select nodes that have a NODE_ID, EXTERNAL_ID or NODE_GROUP_ID + * that are equal to the value on the column. + *

+ * REDIRECT_NODE is an instruction to match the specified column to a + * registrant_external_id on registration_redirect and return the associated + * registration_node_id in the list of node id to route to. For example, if the + * 'price' table was being routed to to a region 1 node based on the store_id, + * the store_id would be the external_id of a node in the registration_redirect + * table and the router_expression for trigger entry for the 'price' table would + * be 'store_id=:REDIRECT_NODE' and the router_name would be 'column'. + * + */ public class ColumnMatchDataRouter extends AbstractDataRouter implements IDataRouter { + private IRegistrationService registrationService; public Collection routeToNodes(DataMetaData dataMetaData, Set nodes, boolean initialLoad) { Collection nodeIds = null; @@ -59,6 +90,17 @@ public Collection routeToNodes(DataMetaData dataMetaData, Set node nodeIds.add(node.getNodeId()); } } + } else if (value.equalsIgnoreCase(":REDIRECT_NODE")) { + nodeIds = new HashSet(); + // TODO should we do any caching here? I am starting to lose + // track of where all + // we cache. Maybe we need a pattern or central service or + // manager for caching?? + Map redirectMap = registrationService.getRegistrationRedirectMap(); + String nodeId = redirectMap.get(columnValues.get(column)); + if (nodeId != null) { + nodeIds.add(nodeId); + } } else { if (value.equals(columnValues.get(column))) { nodeIds = toNodeIds(nodes); @@ -73,4 +115,7 @@ public Collection routeToNodes(DataMetaData dataMetaData, Set node } + public void setRegistrationService(IRegistrationService registrationService) { + this.registrationService = registrationService; + } } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/IDataRouter.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/IDataRouter.java index 0ff845c4ed..b794310b04 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/IDataRouter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/IDataRouter.java @@ -31,8 +31,8 @@ * certain nodes with data changes. SymmetricDS comes with a build-in data routers like * {@link SubSelectDataRouter} and {@link ColumnMatchDataRouter}. *

- * In order to configure a data router you use the router_name and routing_expression column on - * sym_trigger. The given Spring bean name of the {@link IDataRouter} is the router_name and + * In order to configure a data router you use the router_name and router_expression column on + * the trigger table. The given Spring bean name of the {@link IDataRouter} is the router_name and * each data router is configured using the routing_expression according to its implementation. * * @since 2.0 diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/RegistrationRedirectDataRouter.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/RegistrationRedirectDataRouter.java deleted file mode 100644 index a025c829ae..0000000000 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/RegistrationRedirectDataRouter.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * SymmetricDS is an open source database synchronization solution. - * - * Copyright (C) Chris Henson - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 3 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, see - * . - */ -package org.jumpmind.symmetric.route; - -import java.util.Collection; -import java.util.Set; - -import org.jumpmind.symmetric.model.DataMetaData; -import org.jumpmind.symmetric.model.Node; - -public class RegistrationRedirectDataRouter extends AbstractDataRouter { - - public Collection routeToNodes(DataMetaData dataMetaData, Set nodes, boolean initialLoad) { - // TODO Cache registration redirect map per run and look at specified - // column to determine which node ids to route to - return null; - } - -} diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IRegistrationService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IRegistrationService.java index 670d77fcb2..9313db2e59 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IRegistrationService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IRegistrationService.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.Map; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.security.INodePasswordFilter; @@ -73,5 +74,7 @@ public interface IRegistrationService { * Add an entry to the registation_redirect table so that if a node tries to register here. It will be redirected to the correct node. */ public void saveRegistrationRedirect(String externalIdToRedirect, String nodeIdToRedirectTo); + + public Map getRegistrationRedirectMap(); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IRoutingService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IRoutingService.java index 8c30d09758..d951d80c56 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IRoutingService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IRoutingService.java @@ -1,9 +1,30 @@ +/* + * SymmetricDS is an open source database synchronization solution. + * + * Copyright (C) Chris Henson + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + */ package org.jumpmind.symmetric.service; import java.util.Set; import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.route.IBatchAlgorithm; +import org.jumpmind.symmetric.route.IDataRouter; public interface IRoutingService { @@ -11,5 +32,8 @@ public interface IRoutingService { public void routeData(); public boolean shouldDataBeRouted(DataMetaData dataMetaData, Set nodes, boolean initialLoad); + + public void addDataRouter(String name, IDataRouter dataRouter); + public void addBatchAlgorithm(String name, IBatchAlgorithm algorithm); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java index ea522801cd..51d49c2d7d 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java @@ -24,8 +24,12 @@ import java.io.IOException; import java.io.OutputStream; import java.net.ConnectException; +import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Types; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; @@ -45,6 +49,8 @@ import org.jumpmind.symmetric.transport.ITransportManager; import org.jumpmind.symmetric.upgrade.UpgradeConstants; import org.jumpmind.symmetric.util.RandomTimeSlot; +import org.springframework.jdbc.core.simple.ParameterizedRowMapper; +import org.springframework.jdbc.core.simple.SimpleJdbcTemplate; import org.springframework.transaction.annotation.Transactional; /** @@ -161,6 +167,21 @@ public void markNodeAsRegistered(String nodeId) { } + public Map getRegistrationRedirectMap() { + SimpleJdbcTemplate template = new SimpleJdbcTemplate(this.jdbcTemplate); + return template.queryForObject(getSql("getRegistrationRedirectSql"), + new ParameterizedRowMapper>() { + public Map mapRow(ResultSet rs, int rowNum) throws SQLException { + Map results = new HashMap(); + do { + results.put(rs.getString(1), rs.getString(2)); + } while (rs.next()); + return results; + } + }); + + } + private void sleepBeforeRegistrationRetry() { try { long sleepTimeInMs = DateUtils.MILLIS_PER_SECOND * randomTimeSlot.getRandomValueSeededByDomainId(); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RoutingService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RoutingService.java index 5cb4af3449..cabcff7a8d 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RoutingService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RoutingService.java @@ -109,9 +109,11 @@ public void routeData() { } /** - * We route data channel by channel for two reasons. One is that if/when we decide to multi-thread the routing it is - * a simple matter of inserting a thread pool here and waiting for all channels to be processed. The other reason is - * to reduce the amount number of connections we are required to have. + * We route data channel by channel for two reasons. One is that if/when we + * decide to multi-thread the routing it is a simple matter of inserting a + * thread pool here and waiting for all channels to be processed. The other + * reason is to reduce the amount number of connections we are required to + * have. */ protected void routeDataForEachChannel() { final List channels = configurationService.getChannels(); @@ -235,7 +237,8 @@ protected void routeData(Data data, Map transactionIdDataId, IRout batch.getBatchId()); if (batchAlgorithms.get(routingContext.getChannel().getBatchAlgorithm()).completeBatch( routingContext.getChannel(), history, batch, data, databaseTransactionBoundary)) { - // TODO Add route_time_ms to history. Also fix outgoing batch so we don't end up + // TODO Add route_time_ms to history. Also + // fix outgoing batch so we don't end up // with so many history records outgoingBatchService.insertOutgoingBatchHistory(routingContext.getJdbcTemplate(), history); @@ -297,6 +300,14 @@ protected OutgoingBatch createNewBatch(JdbcTemplate template, String nodeId, Str return batch; } + public void addDataRouter(String name, IDataRouter dataRouter) { + routers.put(name, dataRouter); + } + + public void addBatchAlgorithm(String name, IBatchAlgorithm algorithm) { + batchAlgorithms.put(name, algorithm); + } + public void setConfigurationService(IConfigurationService configurationService) { this.configurationService = configurationService; } diff --git a/symmetric/src/main/resources/org/jumpmind/symmetric/services/impl/registration-service-sql.xml b/symmetric/src/main/resources/org/jumpmind/symmetric/services/impl/registration-service-sql.xml index 9c9f221701..ed56e6a9e5 100644 --- a/symmetric/src/main/resources/org/jumpmind/symmetric/services/impl/registration-service-sql.xml +++ b/symmetric/src/main/resources/org/jumpmind/symmetric/services/impl/registration-service-sql.xml @@ -59,6 +59,11 @@ update $[sym.sync.table.prefix]_registration_redirect set registration_node_id=? where registrant_external_id=? + + + select registrant_external_id, registration_node_id from $[sym.sync.table.prefix]_registration_redirect + + \ No newline at end of file diff --git a/symmetric/src/main/resources/symmetric-extensions.xml b/symmetric/src/main/resources/symmetric-extensions.xml index f656ae17f9..edb7709300 100644 --- a/symmetric/src/main/resources/symmetric-extensions.xml +++ b/symmetric/src/main/resources/symmetric-extensions.xml @@ -14,6 +14,7 @@ + diff --git a/symmetric/src/main/resources/symmetric-routers.xml b/symmetric/src/main/resources/symmetric-routers.xml index ef6609d305..31668ea62c 100644 --- a/symmetric/src/main/resources/symmetric-routers.xml +++ b/symmetric/src/main/resources/symmetric-routers.xml @@ -16,8 +16,10 @@ - - + + + + diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockRegistrationService.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockRegistrationService.java index 86fa8d7c96..c52456d9da 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockRegistrationService.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockRegistrationService.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.Map; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.security.INodePasswordFilter; @@ -30,11 +31,9 @@ public class MockRegistrationService implements IRegistrationService { public void setNodePasswordFilter(INodePasswordFilter nodePasswordFilter) { - // TODO Auto-generated method stub - - } + } - public boolean isAutoRegistration() { + public boolean isAutoRegistration() { return false; } @@ -43,9 +42,9 @@ public void openRegistration(String nodeGroupId, String externalId) { } public void markNodeAsRegistered(String nodeId) { - + } - + public void reOpenRegistration(String nodeId) { } @@ -60,8 +59,11 @@ public void registerWithServer() { public boolean isRegisteredWithServer() { return true; } - - public void saveRegistrationRedirect(String externalIdToRedirect, - String nodeIdToRedirectTo) { + + public void saveRegistrationRedirect(String externalIdToRedirect, String nodeIdToRedirectTo) { + } + + public Map getRegistrationRedirectMap() { + return null; } } \ No newline at end of file