Skip to content

Commit

Permalink
Support a push reload.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Aug 24, 2008
1 parent d5585e4 commit 6cd2893
Show file tree
Hide file tree
Showing 14 changed files with 172 additions and 47 deletions.
29 changes: 20 additions & 9 deletions symmetric/src/changes/changes.xml
Expand Up @@ -6,21 +6,32 @@
</properties>
<body>
<release version="1.4.0" date="unknown" description="Lets add some polish">
<action dev="erilong" type="fix">Sorted outgoing batch in code to reduce stress on database.</action>
<action dev="erilong" type="fix">
Sorted outgoing batch in code to reduce stress on database.
Went through code to make sure ALL literals in SQL were parameterized (even the ones that are not
variable). This makes a big difference on Oracle.
</action>
<action dev="chenson42" type="add" issue="aid=1985814&amp;atid=997724">
By default, the DataLoader uses fallback logic. (If an insert violates the primary key, perform an
update instead. If an update affects no rows, perform an insert instead.) Addd a runtime property that
lets you disable the fallback logic.
</action>
<action dev="chenson42" type="add" issue="aid=1956188&amp;atid=997724">
Support an initial load pushed from client to server.
</action>
<action dev="chenson42" type="fix">
Fixed null pointer for blank my.url property. We now default my.url to blank in
symmetric-default.properties.
</action>
<action dev="erilong" type="fix">
Went through code to make sure ALL literals in SQL were parameterized (even the ones that are not variable). This makes a big difference on Oracle.
</action>
<action dev="dmichels2" type="add">Added IP address authorization.</action>
<action dev="chenson42" type="add">
Use MX4J's HTMLAdaptor for the stand-alone SymmetricDS instance so we have a nice JMX console.
Use MX4J's HTMLAdaptor for the stand-alone SymmetricDS instance so we have a nice JMX console.
</action>
<action dev="chenson42" type="add">
Added new extension architecture which allows extension points to be register automatically when they are in the
classpath and the META-INF/services/symmetric-*-ext.xml Spring file is found. Also added an interface which allows
extension points to be targeted for specific node groups.
</action>
Added new extension architecture which allows extension points to be register automatically when they
are in the classpath and the META-INF/services/symmetric-*-ext.xml Spring file is found. Also added an
interface which allows extension points to be targeted for specific node groups.
</action>
<action dev="chenson42" type="add">
Change the property names to remove the inconsistent symmetric. and symmetric.runtime. prefixes. This
change keeps backwards compatibility with the old property names.
Expand Down
Expand Up @@ -307,8 +307,8 @@ public synchronized void start() {
/**
* Queue up an initial load or a reload to a node.
*/
public void reloadNode(String nodeId) {
dataService.reloadNode(nodeId);
public String reloadNode(String nodeId) {
return dataService.reloadNode(nodeId);
}

/**
Expand Down
Expand Up @@ -175,4 +175,6 @@ public String getTimezoneOffset() {
public void setTimezoneOffset(String timezoneOffset) {
this.timezoneOffset = timezoneOffset;
}


}
Expand Up @@ -61,5 +61,13 @@ public interface INodeService {
public boolean updateNodeSecurity(NodeSecurity security);

public boolean setInitialLoadEnabled(String nodeId, boolean initialLoadEnabled);

public String generatePassword();

/**
* Generate the next node ID that is available. Try to use the domain ID as
* the node ID.
*/
public String generateNodeId(String nodeGroupId, String externalId);

}
Expand Up @@ -158,8 +158,11 @@ public String reloadNode(String nodeId) {
if (targetNode == null) {
return "Unknown node " + nodeId;
}
nodeService.setInitialLoadEnabled(nodeId, true);
return "Successfully opened initial load for node " + nodeId;
if (nodeService.setInitialLoadEnabled(nodeId, true)) {
return "Successfully opened initial load for node " + nodeId;
} else {
return "Could not open initial load for " + nodeId;
}
}

public void insertReloadEvent(Node targetNode) {
Expand Down
Expand Up @@ -31,6 +31,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math.random.RandomDataImpl;
import org.jumpmind.symmetric.model.DataEventAction;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeSecurity;
Expand All @@ -41,7 +42,7 @@ public class NodeService extends AbstractService implements INodeService {

@SuppressWarnings("unused")
private static final Log logger = LogFactory.getLog(NodeService.class);

private Node nodeIdentity;

/**
Expand Down Expand Up @@ -84,9 +85,23 @@ public boolean isRegistrationEnabled(String nodeId) {
*/
@SuppressWarnings("unchecked")
public NodeSecurity findNodeSecurity(String id) {
return findNodeSecurity(id, false);
}

@SuppressWarnings("unchecked")
public NodeSecurity findNodeSecurity(String id, boolean createIfNotFound) {
List<NodeSecurity> list = jdbcTemplate.query(getSql("findNodeSecuritySql"), new Object[] { id },
new NodeSecurityRowMapper());
return (NodeSecurity) getFirstEntry(list);
NodeSecurity security = (NodeSecurity) getFirstEntry(list);
if (security == null && createIfNotFound) {
insertNodeSecurity(id);
security = findNodeSecurity(id, false);
}
return security;
}

public void insertNodeSecurity(String id) {
jdbcTemplate.update(getSql("insertNodeSecuritySql"), new Object[] { id, generatePassword() });
}

public boolean updateNode(Node node) {
Expand Down Expand Up @@ -170,7 +185,7 @@ public boolean updateNodeSecurity(NodeSecurity security) {
}

public boolean setInitialLoadEnabled(String nodeId, boolean initialLoadEnabled) {
NodeSecurity nodeSecurity = findNodeSecurity(nodeId);
NodeSecurity nodeSecurity = findNodeSecurity(nodeId, true);
if (nodeSecurity != null) {
nodeSecurity.setInitialLoadEnabled(initialLoadEnabled);
if (initialLoadEnabled) {
Expand All @@ -182,6 +197,32 @@ public boolean setInitialLoadEnabled(String nodeId, boolean initialLoadEnabled)
}
return false;
}

/**
* Generate a secure random password for a node.
*/
// TODO: nodeGenerator.generatePassword();
public String generatePassword() {
return new RandomDataImpl().nextSecureHexString(30);
}

/**
* Generate the next node ID that is available. Try to use the domain ID as
* the node ID.
*/
// TODO: nodeGenerator.generateNodeId();
public String generateNodeId(String nodeGroupId, String externalId) {
String nodeId = externalId;
int maxTries = 100;
for (int sequence = 0; sequence < maxTries; sequence++) {
if (findNode(nodeId) == null) {
return nodeId;
}
nodeId = externalId + "-" + sequence;
}
throw new RuntimeException("Could not find nodeId for externalId of " + externalId + " after " + maxTries
+ " tries.");
}

class NodeRowMapper implements RowMapper {
public Object mapRow(ResultSet rs, int num) throws SQLException {
Expand Down
Expand Up @@ -30,8 +30,10 @@
import org.jumpmind.symmetric.common.ErrorConstants;
import org.jumpmind.symmetric.model.BatchInfo;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.service.IAcknowledgeService;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IPushService;
import org.jumpmind.symmetric.transport.AuthenticationException;
Expand All @@ -52,6 +54,8 @@ public class PushService extends AbstractService implements IPushService {

private INodeService nodeService;

private IDataService dataService;

public void pushData() {
List<Node> nodes = nodeService.findNodesToPushTo();
if (nodes != null && nodes.size() > 0) {
Expand All @@ -70,6 +74,13 @@ private boolean pushToNode(Node remote) {
IOutgoingWithResponseTransport transport = null;
boolean success = false;
try {
NodeSecurity nodeSecurity = nodeService.findNodeSecurity(remote.getNodeId());
if (nodeSecurity != null) {
if (nodeSecurity.isInitialLoadEnabled()) {
dataService.insertReloadEvent(remote);
}
}

transport = transportManager.getPushTransport(remote, nodeService.findIdentity());

if (extractor.extract(remote, transport)) {
Expand Down Expand Up @@ -132,4 +143,8 @@ public void setNodeService(INodeService nodeService) {
public void setAckService(IAcknowledgeService ackService) {
this.ackService = ackService;
}

public void setDataService(IDataService dataService) {
this.dataService = dataService;
}
}
Expand Up @@ -30,7 +30,6 @@
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math.random.RandomDataImpl;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.IDbDialect;
Expand Down Expand Up @@ -201,7 +200,7 @@ protected boolean writeConfiguration(Node node, OutputStream out) throws IOExcep
* external ID will be given this information.
*/
public void reOpenRegistration(String nodeId) {
String password = generatePassword();
String password = nodeService.generatePassword();
jdbcTemplate.update(getSql("reopenRegistrationSql"), new Object[] { password, nodeId });
}

Expand All @@ -213,39 +212,13 @@ public void reOpenRegistration(String nodeId) {
* this node group and external ID will be given this information.
*/
public void openRegistration(String nodeGroup, String externalId) {
String nodeId = generateNodeId(nodeGroup, externalId);
String password = generatePassword();
String nodeId = nodeService.generateNodeId(nodeGroup, externalId);
String password = nodeService.generatePassword();
jdbcTemplate.update(getSql("openRegistrationNodeSql"), new Object[] { nodeId, nodeGroup, externalId });
jdbcTemplate.update(getSql("openRegistrationNodeSecuritySql"), new Object[] { nodeId, password });
clusterService.initLockTableForNode(nodeService.findNode(nodeId));
}

/**
* Generate a secure random password for a node.
*/
// TODO: nodeGenerator.generatePassword();
protected String generatePassword() {
return new RandomDataImpl().nextSecureHexString(30);
}

/**
* Generate the next node ID that is available. Try to use the domain ID as
* the node ID.
*/
// TODO: nodeGenerator.generateNodeId();
protected String generateNodeId(String nodeGroupId, String externalId) {
String nodeId = externalId;
int maxTries = 100;
for (int sequence = 0; sequence < maxTries; sequence++) {
if (nodeService.findNode(nodeId) == null) {
return nodeId;
}
nodeId = externalId + "-" + sequence;
}
throw new RuntimeException("Could not find nodeId for externalId of " + externalId + " after " + maxTries
+ " tries.");
}

public void setNodeService(INodeService nodeService) {
this.nodeService = nodeService;
}
Expand Down
6 changes: 6 additions & 0 deletions symmetric/src/main/resources/sql/node-service-sql.xml
Expand Up @@ -87,6 +87,12 @@
registration_time = ?, initial_load_enabled = ?, initial_load_time = ? where node_id = ?
</value>
</entry>
<entry key="insertNodeSecuritySql">
<value>
insert into ${sync.table.prefix}_node_security (node_id, node_password) values (?, ?)
</value>
</entry>

</util:map>

</beans>
1 change: 1 addition & 0 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -114,6 +114,7 @@
<property name="extractor" ref="dataExtractorService" />
<property name="transportManager" ref="transportManager" />
<property name="parameterService" ref="parameterService" />
<property name="dataService" ref="dataService" />
<property name="ackService" ref="acknowledgeService" />
<property name="nodeService" ref="nodeService" />
</bean>
Expand Down
Expand Up @@ -92,4 +92,12 @@ public Node findIdentity(boolean useCache) {
// TODO Auto-generated method stub
return null;
}

public String generateNodeId(String nodeGroupId, String externalId) {
return null;
}

public String generatePassword() {
return null;
}
}
Expand Up @@ -27,7 +27,7 @@
import org.junit.runners.Suite.SuiteClasses;

@RunWith(ParameterizedSuite.class)
@SuiteClasses( { SimpleIntegrationTest.class, CleanupTest.class })
@SuiteClasses( { SimpleIntegrationTest.class, LoadFromClientIntegrationTest.class, CleanupTest.class })
public class IntegrationTestSuite {

static final String TEST_PREFIX = "test";
Expand Down
@@ -0,0 +1,51 @@
/*
* SymmetricDS is an open source database synchronization solution.
*
* Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*/
package org.jumpmind.symmetric.test;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.jdbc.core.JdbcTemplate;

public class LoadFromClientIntegrationTest extends AbstractIntegrationTest {

static final Log logger = LogFactory.getLog(LoadFromClientIntegrationTest.class);

public LoadFromClientIntegrationTest() throws Exception {
}

public LoadFromClientIntegrationTest(String client, String root) throws Exception {
super(client, root);
}

@Test(timeout = 30000)
public void registerClientWithRoot() {
getRootEngine().openRegistration(TestConstants.TEST_CLIENT_NODE_GROUP, TestConstants.TEST_CLIENT_EXTERNAL_ID);
getClientEngine().start();
String result = getClientEngine().reloadNode("00000");
Assert.assertTrue(result, result.startsWith("Successfully opened initial load for node"));
getClientEngine().push();
JdbcTemplate jdbcTemplate = getClientDbDialect().getJdbcTemplate();
int initialLoadEnabled = jdbcTemplate.queryForInt("select initial_load_enabled from sym_node_security where node_id='00000'");
Assert.assertEquals(0, initialLoadEnabled);
}

}
6 changes: 6 additions & 0 deletions symmetric/src/test/resources/test-tables-ddl.xml
Expand Up @@ -71,6 +71,12 @@
<table name="ONE_COLUMN_TABLE">
<column name="MY_ONE_COLUMN" type="INTEGER" primaryKey="true" required="true" />
</table>

<table name="INITIAL_LOAD_FROM_CLIENT_TABLE">
<column name="ID" type="INTEGER" primaryKey="true" required="true" />
<column name="DATA" type="VARCHAR" size="10" />
</table>


<table name="TEST_ALL_CAPS">
<column name="ALL_CAPS_ID" type="INTEGER" primaryKey="true" required="true" />
Expand Down

0 comments on commit 6cd2893

Please sign in to comment.