Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0001087: If engine.name isn't used, then some services are not availa…
…ble. Should be available as long as there is only one node deployed

0001086: Make the 3.x use 1.x protocol for backwards compatibility
  • Loading branch information
chenson42 committed Mar 2, 2013
1 parent e9ca0ea commit 7102d91
Show file tree
Hide file tree
Showing 19 changed files with 204 additions and 98 deletions.
Expand Up @@ -29,13 +29,17 @@
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.service.IParameterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class represents a node who has registered for sync updates.
*/
public class Node implements Serializable {

private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 1L;

private static final Logger log = LoggerFactory.getLogger(Node.class);

private int MAX_VERSION_SIZE = 50;

Expand Down Expand Up @@ -73,8 +77,8 @@ public class Node implements Serializable {

private int batchInErrorCount;

private String deploymentType;

private String deploymentType;
public Node() {
}

Expand Down Expand Up @@ -222,6 +226,24 @@ public void setDeploymentType(String deploymentType) {

public String getDeploymentType() {
return deploymentType;
}

public boolean requires13Compatiblity() {
if (symmetricVersion != null) {
if (symmetricVersion.equals("development")) {
return false;
}
try {
int[] currentVersion = Version.parseVersion(symmetricVersion);
return currentVersion != null && currentVersion.length > 0 && currentVersion[0] <= 1;
} catch (Exception ex) {
log.warn(
"Could not parse the version {} for node {}. Setting backwards compatibility mode to true",
symmetricVersion, nodeId);
return true;
}
}
return false;
}

public boolean isVersionGreaterThanOrEqualTo(int... targetVersion) {
Expand Down
Expand Up @@ -100,7 +100,8 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
String nodeIdInQuestion = columnValues.get("NODE_ID");
List<NodeGroupLink> nodeGroupLinks = getNodeGroupLinksFromContext(routingContext);
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode, me,
if (!nodeThatMayBeRoutedTo.requires13Compatiblity() &&
isLinked(nodeIdInQuestion, nodeThatMayBeRoutedTo, rootNetworkedNode, me,
nodeGroupLinks)
&& !isSameNumberOfLinksAwayFromRoot(nodeThatMayBeRoutedTo,
rootNetworkedNode, me)
Expand Down Expand Up @@ -185,7 +186,8 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
}
} else {
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (nodeThatMayBeRoutedTo.getNodeId().equals(sourceNodeId)) {
if (!nodeThatMayBeRoutedTo.requires13Compatiblity() &&
nodeThatMayBeRoutedTo.getNodeId().equals(sourceNodeId)) {
if (nodeIds == null) {
nodeIds = new HashSet<String>();
}
Expand All @@ -196,9 +198,10 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData

} else {
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (!isSameNumberOfLinksAwayFromRoot(nodeThatMayBeRoutedTo, rootNetworkedNode,
if (!nodeThatMayBeRoutedTo.requires13Compatiblity() && (
!isSameNumberOfLinksAwayFromRoot(nodeThatMayBeRoutedTo, rootNetworkedNode,
me)
|| (nodeThatMayBeRoutedTo.getNodeId().equals(me.getNodeId()) && initialLoad)) {
|| (nodeThatMayBeRoutedTo.getNodeId().equals(me.getNodeId()) && initialLoad))) {
if (nodeIds == null) {
nodeIds = new HashSet<String>();
}
Expand Down
Expand Up @@ -230,7 +230,7 @@ public void extractConfigurationStandalone(Node targetNode, Writer writer,
source);

ProtocolDataWriter dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(),
writer);
writer, targetNode.requires13Compatiblity());
DataProcessor processor = new DataProcessor(dataReader, dataWriter);
DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
Expand Down Expand Up @@ -349,7 +349,7 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport,

if (dataWriter == null) {
dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(),
targetTransport.open());
targetTransport.open(), targetNode.requires13Compatiblity());
}

currentBatch = extractOutgoingBatch(targetNode, dataWriter, currentBatch,
Expand Down Expand Up @@ -635,16 +635,18 @@ public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId
OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(batchId, nodeId);
if (batch != null) {
Node targetNode = nodeService.findNode(nodeId);
IDataReader dataReader = new ExtractDataReader(symmetricDialect.getPlatform(),
new SelectFromSymDataSource(batch, sourceNode, targetNode));
DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, nodeService.findIdentity());
new DataProcessor(dataReader, createTransformDataWriter(nodeService.findIdentity(),
targetNode,
new ProtocolDataWriter(nodeService.findIdentityNodeId(), writer)))
.process(ctx);
foundBatch = true;
if (targetNode != null) {
IDataReader dataReader = new ExtractDataReader(symmetricDialect.getPlatform(),
new SelectFromSymDataSource(batch, sourceNode, targetNode));
DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, nodeService.findIdentity());
new DataProcessor(dataReader, createTransformDataWriter(
nodeService.findIdentity(), targetNode,
new ProtocolDataWriter(nodeService.findIdentityNodeId(), writer,
targetNode.requires13Compatiblity()))).process(ctx);
foundBatch = true;
}
}
}
return foundBatch;
Expand Down
Expand Up @@ -289,7 +289,7 @@ public void loadDataFromPush(Node sourceNode, InputStream in, OutputStream out)
in));
Node local = nodeService.findIdentity();
NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId());
transportManager.writeAcknowledgement(out, list, local,
transportManager.writeAcknowledgement(out, sourceNode, list, local,
security != null ? security.getNodePassword() : null);
}

Expand Down Expand Up @@ -506,10 +506,6 @@ protected IDataLoaderFactory getFactory(String channelId) {
IDataLoaderFactory factory = null;
if (channel != null) {
dataLoaderType = channel.getDataLoaderType();
} else if (!channelId.equals(Constants.CHANNEL_CONFIG)) {
log.warn(
"Could not locate the channel with the id of '{}'. Using the 'default' data loader.",
channelId);
}

factory = dataLoaderFactories.get(dataLoaderType);
Expand Down
Expand Up @@ -89,34 +89,48 @@ public String resolveURL(String syncUrl, String registrationUrl) {
}
}

protected String getAcknowledgementData(String nodeId, List<IncomingBatch> list) throws IOException {
StringBuilder builder = new StringBuilder();
for (IncomingBatch batch : list) {
long batchId = batch.getBatchId();
Object value = null;
if (batch.getStatus() == Status.OK) {
value = WebConstants.ACK_BATCH_OK;
} else {
value = batch.getFailedRowNumber();
protected String getAcknowledgementData(boolean requires13Format, String nodeId,
List<IncomingBatch> list) throws IOException {
StringBuilder builder = new StringBuilder();
if (!requires13Format) {
for (IncomingBatch batch : list) {
long batchId = batch.getBatchId();
Object value = null;
if (batch.getStatus() == Status.OK) {
value = WebConstants.ACK_BATCH_OK;
} else {
value = batch.getFailedRowNumber();
}
append(builder, WebConstants.ACK_BATCH_NAME + batch.getBatchId(), value);
append(builder, WebConstants.ACK_NODE_ID + batchId, nodeId);
append(builder, WebConstants.ACK_NETWORK_MILLIS + batchId, batch.getNetworkMillis());
append(builder, WebConstants.ACK_FILTER_MILLIS + batchId, batch.getFilterMillis());
append(builder, WebConstants.ACK_DATABASE_MILLIS + batchId,
batch.getDatabaseMillis());
append(builder, WebConstants.ACK_BYTE_COUNT + batchId, batch.getByteCount());

if (batch.getIgnoreCount() > 0) {
append(builder, WebConstants.ACK_IGNORE_COUNT + batchId, batch.getIgnoreCount());
}

if (batch.getStatus() == Status.ER) {
append(builder, WebConstants.ACK_SQL_STATE + batchId, batch.getSqlState());
append(builder, WebConstants.ACK_SQL_CODE + batchId, batch.getSqlCode());
append(builder, WebConstants.ACK_SQL_MESSAGE + batchId, batch.getSqlMessage());
}
}
append(builder, WebConstants.ACK_BATCH_NAME + batch.getBatchId(), value);
append(builder, WebConstants.ACK_NODE_ID + batchId, nodeId);
append(builder, WebConstants.ACK_NETWORK_MILLIS + batchId, batch.getNetworkMillis());
append(builder, WebConstants.ACK_FILTER_MILLIS + batchId, batch.getFilterMillis());
append(builder, WebConstants.ACK_DATABASE_MILLIS + batchId, batch.getDatabaseMillis());
append(builder, WebConstants.ACK_BYTE_COUNT + batchId, batch.getByteCount());

if (batch.getIgnoreCount() > 0) {
append(builder, WebConstants.ACK_IGNORE_COUNT + batchId, batch.getIgnoreCount());
}

if (batch.getStatus() == Status.ER) {
append(builder, WebConstants.ACK_SQL_STATE + batchId, batch.getSqlState());
append(builder, WebConstants.ACK_SQL_CODE + batchId, batch.getSqlCode());
append(builder, WebConstants.ACK_SQL_MESSAGE + batchId, batch.getSqlMessage());
}
}
return builder.toString();
} else {
for (IncomingBatch batch : list) {
Object value = null;
if (batch.getStatus() == Status.OK || batch.getStatus() == Status.IG) {
value = WebConstants.ACK_BATCH_OK;
} else {
value = batch.getFailedRowNumber();
}
append(builder, WebConstants.ACK_BATCH_NAME + batch.getBatchId(), value);
}
}
return builder.toString();
}

protected static void append(StringBuilder builder, String name, Object value) {
Expand Down Expand Up @@ -157,8 +171,12 @@ public static List<BatchAck> readAcknowledgement(Map<String, ? extends Object> p
}

private static BatchAck getBatchInfo(Map<String, ? extends Object> parameters, long batchId) {
BatchAck batchInfo = new BatchAck(batchId);
batchInfo.setNodeId(getParam(parameters, WebConstants.ACK_NODE_ID + batchId));
BatchAck batchInfo = new BatchAck(batchId);
String nodeId = getParam(parameters, WebConstants.ACK_NODE_ID + batchId);
if (StringUtils.isBlank(nodeId)) {
nodeId = getParam(parameters, WebConstants.NODE_ID);
}
batchInfo.setNodeId(nodeId);
batchInfo.setNetworkMillis(getParamAsNum(parameters, WebConstants.ACK_NETWORK_MILLIS + batchId));
batchInfo.setFilterMillis(getParamAsNum(parameters, WebConstants.ACK_FILTER_MILLIS + batchId));
batchInfo.setDatabaseMillis(getParamAsNum(parameters, WebConstants.ACK_DATABASE_MILLIS + batchId));
Expand Down
Expand Up @@ -37,7 +37,7 @@ public interface ITransportManager {

public int sendAcknowledgement(Node remote, List<IncomingBatch> list, Node local, String securityToken, String registrationUrl) throws IOException;

public void writeAcknowledgement(OutputStream out, List<IncomingBatch> list, Node local, String securityToken) throws IOException;
public void writeAcknowledgement(OutputStream out, Node remote, List<IncomingBatch> list, Node local, String securityToken) throws IOException;

public List<BatchAck> readAcknowledgement(String parameterString1, String parameterString2) throws IOException;

Expand Down
Expand Up @@ -70,15 +70,15 @@ public HttpTransportManager(ISymmetricEngine engine) {
public int sendAcknowledgement(Node remote, List<IncomingBatch> list, Node local,
String securityToken, String registrationUrl) throws IOException {
if (list != null && list.size() > 0) {
String data = getAcknowledgementData(local.getNodeId(), list);
String data = getAcknowledgementData(remote.requires13Compatiblity(), local.getNodeId(), list);
return sendMessage("ack", remote, local, data, securityToken, registrationUrl);
}
return HttpURLConnection.HTTP_OK;
}

public void writeAcknowledgement(OutputStream out, List<IncomingBatch> list, Node local,
public void writeAcknowledgement(OutputStream out, Node remote, List<IncomingBatch> list, Node local,
String securityToken) throws IOException {
writeMessage(out, getAcknowledgementData(local.getNodeId(), list));
writeMessage(out, getAcknowledgementData(remote.requires13Compatiblity(), local.getNodeId(), list));
}

protected int sendMessage(String action, Node remote, Node local, String data,
Expand Down
Expand Up @@ -116,7 +116,7 @@ public int sendAcknowledgement(Node remote, List<IncomingBatch> list,
try {
if (list != null && list.size() > 0) {
ISymmetricEngine remoteEngine = getTargetEngine(remote.getSyncUrl());
String ackData = getAcknowledgementData(local.getNodeId(), list);
String ackData = getAcknowledgementData(remote.requires13Compatiblity(), local.getNodeId(), list);
List<BatchAck> batches = readAcknowledgement(ackData);
for (BatchAck batchInfo : batches) {
remoteEngine.getAcknowledgeService().ack(batchInfo);
Expand All @@ -129,11 +129,11 @@ public int sendAcknowledgement(Node remote, List<IncomingBatch> list,
}
}

public void writeAcknowledgement(OutputStream out,
public void writeAcknowledgement(OutputStream out, Node remote,
List<IncomingBatch> list, Node local, String securityToken)
throws IOException {
PrintWriter pw = new PrintWriter(new OutputStreamWriter(out, IoConstants.ENCODING), true);
pw.println(getAcknowledgementData(local.getNodeId(), list));
pw.println(getAcknowledgementData(remote.requires13Compatiblity(), local.getNodeId(), list));
pw.close();
}

Expand Down
Expand Up @@ -60,7 +60,7 @@ public int sendAcknowledgement(Node remote, List<IncomingBatch> list,
return HttpURLConnection.HTTP_OK;
}

public void writeAcknowledgement(OutputStream out,
public void writeAcknowledgement(OutputStream out, Node remote,
List<IncomingBatch> list, Node local, String securityToken)
throws IOException {
}
Expand Down
Expand Up @@ -29,14 +29,16 @@
public class Batch {

public static final long UNKNOWN_BATCH_ID = -9999;

public static final String DEFAULT_CHANNEL_ID = "default";

public enum BatchType { EXTRACT, LOAD };

protected long batchId = UNKNOWN_BATCH_ID;
protected String sourceNodeId;
protected String targetNodeId;
protected boolean initialLoad;
protected String channelId;
protected String channelId = DEFAULT_CHANNEL_ID;
protected BinaryEncoding binaryEncoding;
protected Date startTime;
protected long lineCount;
Expand All @@ -52,7 +54,9 @@ public enum BatchType { EXTRACT, LOAD };
public Batch(BatchType batchType, long batchId, String channelId, BinaryEncoding binaryEncoding, String sourceNodeId, String targetNodeId, boolean common) {
this.batchType = batchType;
this.batchId = batchId;
this.channelId = channelId;
if (channelId != null) {
this.channelId = channelId;
}
this.sourceNodeId = sourceNodeId;
this.targetNodeId = targetNodeId;
this.binaryEncoding = binaryEncoding;
Expand Down
Expand Up @@ -236,7 +236,8 @@ public Map<String, String> toColumnNameValuePairs(String[] keyNames, String key)
}

public boolean requiresTable() {
return dataEventType != null && dataEventType != DataEventType.CREATE;
return dataEventType != null && dataEventType != DataEventType.CREATE &&
dataEventType != DataEventType.SQL && dataEventType != DataEventType.BSH;
}

public boolean isNoBinaryOldData() {
Expand Down
Expand Up @@ -20,6 +20,9 @@
*/
package org.jumpmind.symmetric.io.data;

import java.util.HashMap;
import java.util.Map;

import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriter;
Expand All @@ -39,6 +42,10 @@ public class DataContext extends Context {
protected CsvData data;

protected Throwable lastError;

protected Map<String, Table> parsedTables = new HashMap<String, Table>();

protected Table lastParsedTable = null;

public DataContext(Batch batch) {
this.batch = batch;
Expand Down Expand Up @@ -98,6 +105,18 @@ public void setLastError(Throwable lastError) {
public Throwable getLastError() {
return lastError;
}

public Map<String, Table> getParsedTables() {
return parsedTables;
}

public Table getLastParsedTable() {
return lastParsedTable;
}

public void setLastParsedTable(Table lastParsedTable) {
this.lastParsedTable = lastParsedTable;
}

public ISqlTransaction findTransaction() {
ISqlTransaction transaction = null;
Expand Down

0 comments on commit 7102d91

Please sign in to comment.