Skip to content

Commit

Permalink
Added auto extension registration. Also added support for registratio…
Browse files Browse the repository at this point in the history
…n redirect routing.
  • Loading branch information
chenson42 committed Jul 17, 2009
1 parent 300fc61 commit 71e83b8
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 54 deletions.
Expand Up @@ -33,6 +33,8 @@
import org.jumpmind.symmetric.load.IDataLoaderFilter;
import org.jumpmind.symmetric.load.IReloadListener;
import org.jumpmind.symmetric.load.ITableColumnFilter;
import org.jumpmind.symmetric.route.IBatchAlgorithm;
import org.jumpmind.symmetric.route.IDataRouter;
import org.jumpmind.symmetric.security.INodePasswordFilter;
import org.jumpmind.symmetric.service.IAcknowledgeService;
import org.jumpmind.symmetric.service.IBootstrapService;
Expand All @@ -42,6 +44,7 @@
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IRegistrationService;
import org.jumpmind.symmetric.service.IRoutingService;
import org.jumpmind.symmetric.transport.IAcknowledgeEventListener;
import org.jumpmind.symmetric.transport.ISyncUrlExtension;
import org.jumpmind.symmetric.transport.ITransportManager;
Expand Down Expand Up @@ -71,6 +74,8 @@ public class ExtensionProcessor implements BeanFactoryPostProcessor {
IRegistrationService registrationService;

ITransportManager transportManager;

IRoutingService routingService;

@SuppressWarnings("unchecked")
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
Expand Down Expand Up @@ -167,7 +172,13 @@ private void registerExtension(String beanName, IExtensionPoint ext) {
nodeService.setNodeIdGenerator((INodeIdGenerator) ext);
}

// TODO Add new routing extensions
if (ext instanceof IDataRouter) {
routingService.addDataRouter(beanName, (IDataRouter)ext);
}

if (ext instanceof IBatchAlgorithm) {
routingService.addBatchAlgorithm(beanName, (IBatchAlgorithm)ext);
}
}

public void setDataLoaderService(IDataLoaderService dataLoaderService) {
Expand Down Expand Up @@ -206,4 +217,7 @@ public void setTransportManager(ITransportManager transportManager) {
this.transportManager = transportManager;
}

public void setRoutingService(IRoutingService routingService) {
this.routingService = routingService;
}
}
Expand Up @@ -27,8 +27,39 @@
import org.apache.commons.lang.StringUtils;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IRegistrationService;

/**
* This data router is invoked when the router_name='column'. The
* router_expression is always a name value pair of a column on the table that
* is being synchronized to the value it should be matched with.
* <P>
* The value can be a constant. In the data router the value of the new data is
* always represented by a string so all comparisons are done in the format that
* SymmetricDS transmits.
* <P>
* The value can also be one of the following expressions:
* <ol>
* <li>:NODE_ID</li>
* <li>:EXTERNAL_ID</li>
* <li>:NODE_GROUP_ID</li>
* <li>:REDIRECT_NODE</li>
* </ol>
* NODE_ID, EXTERNAL_ID, and NODE_GROUP_ID are instructions for the column
* matcher to select nodes that have a NODE_ID, EXTERNAL_ID or NODE_GROUP_ID
* that are equal to the value on the column.
* <P>
* REDIRECT_NODE is an instruction to match the specified column to a
* registrant_external_id on registration_redirect and return the associated
* registration_node_id in the list of node id to route to. For example, if the
* 'price' table was being routed to to a region 1 node based on the store_id,
* the store_id would be the external_id of a node in the registration_redirect
* table and the router_expression for trigger entry for the 'price' table would
* be 'store_id=:REDIRECT_NODE' and the router_name would be 'column'.
*
*/
public class ColumnMatchDataRouter extends AbstractDataRouter implements IDataRouter {
private IRegistrationService registrationService;

public Collection<String> routeToNodes(DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad) {
Collection<String> nodeIds = null;
Expand Down Expand Up @@ -59,6 +90,17 @@ public Collection<String> routeToNodes(DataMetaData dataMetaData, Set<Node> node
nodeIds.add(node.getNodeId());
}
}
} else if (value.equalsIgnoreCase(":REDIRECT_NODE")) {
nodeIds = new HashSet<String>();
// TODO should we do any caching here? I am starting to lose
// track of where all
// we cache. Maybe we need a pattern or central service or
// manager for caching??
Map<String, String> redirectMap = registrationService.getRegistrationRedirectMap();
String nodeId = redirectMap.get(columnValues.get(column));
if (nodeId != null) {
nodeIds.add(nodeId);
}
} else {
if (value.equals(columnValues.get(column))) {
nodeIds = toNodeIds(nodes);
Expand All @@ -73,4 +115,7 @@ public Collection<String> routeToNodes(DataMetaData dataMetaData, Set<Node> node

}

public void setRegistrationService(IRegistrationService registrationService) {
this.registrationService = registrationService;
}
}
Expand Up @@ -31,8 +31,8 @@
* certain nodes with data changes. SymmetricDS comes with a build-in data routers like
* {@link SubSelectDataRouter} and {@link ColumnMatchDataRouter}.
* <p>
* In order to configure a data router you use the router_name and routing_expression column on
* sym_trigger. The given Spring bean name of the {@link IDataRouter} is the router_name and
* In order to configure a data router you use the router_name and router_expression column on
* the trigger table. The given Spring bean name of the {@link IDataRouter} is the router_name and
* each data router is configured using the routing_expression according to its implementation.
*
* @since 2.0
Expand Down

This file was deleted.

Expand Up @@ -23,6 +23,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;

import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.security.INodePasswordFilter;
Expand Down Expand Up @@ -73,5 +74,7 @@ public interface IRegistrationService {
* Add an entry to the registation_redirect table so that if a node tries to register here. It will be redirected to the correct node.
*/
public void saveRegistrationRedirect(String externalIdToRedirect, String nodeIdToRedirectTo);

public Map<String,String> getRegistrationRedirectMap();

}
@@ -1,15 +1,39 @@
/*
* SymmetricDS is an open source database synchronization solution.
*
* Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*/
package org.jumpmind.symmetric.service;

import java.util.Set;

import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.route.IBatchAlgorithm;
import org.jumpmind.symmetric.route.IDataRouter;


public interface IRoutingService {

public void routeData();

public boolean shouldDataBeRouted(DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad);

public void addDataRouter(String name, IDataRouter dataRouter);

public void addBatchAlgorithm(String name, IBatchAlgorithm algorithm);
}
Expand Up @@ -24,8 +24,12 @@
import java.io.IOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
Expand All @@ -45,6 +49,8 @@
import org.jumpmind.symmetric.transport.ITransportManager;
import org.jumpmind.symmetric.upgrade.UpgradeConstants;
import org.jumpmind.symmetric.util.RandomTimeSlot;
import org.springframework.jdbc.core.simple.ParameterizedRowMapper;
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
import org.springframework.transaction.annotation.Transactional;

/**
Expand Down Expand Up @@ -161,6 +167,21 @@ public void markNodeAsRegistered(String nodeId) {

}

public Map<String, String> getRegistrationRedirectMap() {
SimpleJdbcTemplate template = new SimpleJdbcTemplate(this.jdbcTemplate);
return template.queryForObject(getSql("getRegistrationRedirectSql"),
new ParameterizedRowMapper<Map<String, String>>() {
public Map<String, String> mapRow(ResultSet rs, int rowNum) throws SQLException {
Map<String, String> results = new HashMap<String, String>();
do {
results.put(rs.getString(1), rs.getString(2));
} while (rs.next());
return results;
}
});

}

private void sleepBeforeRegistrationRetry() {
try {
long sleepTimeInMs = DateUtils.MILLIS_PER_SECOND * randomTimeSlot.getRandomValueSeededByDomainId();
Expand Down
Expand Up @@ -109,9 +109,11 @@ public void routeData() {
}

/**
* We route data channel by channel for two reasons. One is that if/when we decide to multi-thread the routing it is
* a simple matter of inserting a thread pool here and waiting for all channels to be processed. The other reason is
* to reduce the amount number of connections we are required to have.
* We route data channel by channel for two reasons. One is that if/when we
* decide to multi-thread the routing it is a simple matter of inserting a
* thread pool here and waiting for all channels to be processed. The other
* reason is to reduce the amount number of connections we are required to
* have.
*/
protected void routeDataForEachChannel() {
final List<NodeChannel> channels = configurationService.getChannels();
Expand Down Expand Up @@ -235,7 +237,8 @@ protected void routeData(Data data, Map<String, Long> transactionIdDataId, IRout
batch.getBatchId());
if (batchAlgorithms.get(routingContext.getChannel().getBatchAlgorithm()).completeBatch(
routingContext.getChannel(), history, batch, data, databaseTransactionBoundary)) {
// TODO Add route_time_ms to history. Also fix outgoing batch so we don't end up
// TODO Add route_time_ms to history. Also
// fix outgoing batch so we don't end up
// with so many history records
outgoingBatchService.insertOutgoingBatchHistory(routingContext.getJdbcTemplate(),
history);
Expand Down Expand Up @@ -297,6 +300,14 @@ protected OutgoingBatch createNewBatch(JdbcTemplate template, String nodeId, Str
return batch;
}

public void addDataRouter(String name, IDataRouter dataRouter) {
routers.put(name, dataRouter);
}

public void addBatchAlgorithm(String name, IBatchAlgorithm algorithm) {
batchAlgorithms.put(name, algorithm);
}

public void setConfigurationService(IConfigurationService configurationService) {
this.configurationService = configurationService;
}
Expand Down
Expand Up @@ -59,6 +59,11 @@
update $[sym.sync.table.prefix]_registration_redirect set registration_node_id=? where registrant_external_id=?
</value>
</entry>
<entry key="getRegistrationRedirectSql">
<value>
select registrant_external_id, registration_node_id from $[sym.sync.table.prefix]_registration_redirect
</value>
</entry>
</util:map>

</beans>
1 change: 1 addition & 0 deletions symmetric/src/main/resources/symmetric-extensions.xml
Expand Up @@ -14,6 +14,7 @@
<property name="acknowledgeService" ref="acknowledgeService" />
<property name="registrationService" ref="registrationService" />
<property name="transportManager" ref="transportManager" />
<property name="routingService" ref="routingService"/>
</bean>

<bean id="configChangedDataFilter" class="org.jumpmind.symmetric.load.ConfigurationChangedFilter">
Expand Down
6 changes: 4 additions & 2 deletions symmetric/src/main/resources/symmetric-routers.xml
Expand Up @@ -16,8 +16,10 @@
</bean>
</entry>
<entry key="column">
<bean class="org.jumpmind.symmetric.route.ColumnMatchDataRouter"/>
</entry>
<bean class="org.jumpmind.symmetric.route.ColumnMatchDataRouter">
<property name="registrationService" ref="registrationService" />
</bean>
</entry>
<entry key="default">
<bean class="org.jumpmind.symmetric.route.DefaultDataRouter">
</bean>
Expand Down
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;

import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.security.INodePasswordFilter;
Expand All @@ -30,11 +31,9 @@
public class MockRegistrationService implements IRegistrationService {

public void setNodePasswordFilter(INodePasswordFilter nodePasswordFilter) {
// TODO Auto-generated method stub

}
}

public boolean isAutoRegistration() {
public boolean isAutoRegistration() {
return false;
}

Expand All @@ -43,9 +42,9 @@ public void openRegistration(String nodeGroupId, String externalId) {
}

public void markNodeAsRegistered(String nodeId) {

}

public void reOpenRegistration(String nodeId) {

}
Expand All @@ -60,8 +59,11 @@ public void registerWithServer() {
public boolean isRegisteredWithServer() {
return true;
}

public void saveRegistrationRedirect(String externalIdToRedirect,
String nodeIdToRedirectTo) {

public void saveRegistrationRedirect(String externalIdToRedirect, String nodeIdToRedirectTo) {
}

public Map<String, String> getRegistrationRedirectMap() {
return null;
}
}

0 comments on commit 71e83b8

Please sign in to comment.