Skip to content

Commit

Permalink
[ 1913542 ] Collect more info about errors in batches
Browse files Browse the repository at this point in the history
save the extended ack into outgoing_batch_hist from a push
  • Loading branch information
erilong committed Apr 17, 2008
1 parent 8ba6478 commit dedabfe
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 174 deletions.
Expand Up @@ -23,18 +23,14 @@

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.math.NumberUtils;
import org.jumpmind.symmetric.model.BatchInfo;
import org.jumpmind.symmetric.model.OutgoingBatchHistory;
import org.jumpmind.symmetric.model.OutgoingBatch.Status;
import org.jumpmind.symmetric.service.IAcknowledgeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.web.WebConstants;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -65,52 +61,6 @@ public void ack(final BatchInfo batch) {
history.setEndTime(new Date());
outgoingBatchService.insertOutgoingBatchHistory(history);
}

public List<BatchInfo> getBatchInfo(Map<String, Object> parameters) {
List<BatchInfo> batches = new ArrayList<BatchInfo>();
for (String parameterName : parameters.keySet()) {
if (parameterName.startsWith(WebConstants.ACK_BATCH_NAME)) {
String batchId = parameterName.substring(WebConstants.ACK_BATCH_NAME.length());
BatchInfo batchInfo = new BatchInfo(batchId);
batchInfo.setNodeId(getParameter(parameters, WebConstants.NODE_ID));
batchInfo.setNetworkMillis(getParameterAsNumber(parameters, WebConstants.ACK_NETWORK_MILLIS));
batchInfo.setFilterMillis(getParameterAsNumber(parameters, WebConstants.ACK_FILTER_MILLIS));
batchInfo.setDatabaseMillis(getParameterAsNumber(parameters, WebConstants.ACK_DATABASE_MILLIS));
batchInfo.setByteCount(getParameterAsNumber(parameters, WebConstants.ACK_BYTE_COUNT));
String status = getParameter(parameters, parameterName, "");
batchInfo.setOk(status.equalsIgnoreCase(WebConstants.ACK_BATCH_OK));

if (!batchInfo.isOk()) {
batchInfo.setErrorLine(NumberUtils.toLong(status));
batchInfo.setSqlState(getParameter(parameters, WebConstants.ACK_SQL_STATE));
batchInfo.setSqlCode((int) getParameterAsNumber(parameters, WebConstants.ACK_SQL_CODE));
batchInfo.setSqlMessage(getParameter(parameters, WebConstants.ACK_SQL_MESSAGE));
}
batches.add(batchInfo);
}
}
return batches;
}

private long getParameterAsNumber(Map<String, Object> parameters, String parameterName) {
return NumberUtils.toLong(getParameter(parameters, parameterName));
}

private String getParameter(Map<String, Object> parameters, String parameterName, String defaultValue) {
String value = getParameter(parameters, parameterName);
return value == null ? defaultValue : value;
}

private String getParameter(Map<String, Object> parameters, String parameterName) {
Object value = parameters.get(parameterName);
if (value instanceof String[]) {
String[] arrayValue = (String[]) value;
if (arrayValue.length > 0) {
value = arrayValue[0];
}
}
return (String) value;
}

class CallBackHandler implements RowCallbackHandler {
int index = 0;
Expand Down
Expand Up @@ -23,9 +23,7 @@
import java.io.BufferedReader;
import java.net.ConnectException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -41,7 +39,6 @@
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
import org.jumpmind.symmetric.transport.ITransportManager;
import org.jumpmind.symmetric.transport.TransportException;
import org.jumpmind.symmetric.web.WebConstants;

public class PushService implements IPushService {

Expand All @@ -59,82 +56,45 @@ public void pushData() {
List<Node> nodes = nodeService.findNodesToPushTo();
if (nodes != null && nodes.size() > 0) {
logger.info("Push requested");
boolean success = false;
for (Node node : nodes) {
pushToNode(node);
success = pushToNode(node);
}
logger.info("Push request completed");
}
}

class ParameterParser {
private StringTokenizer tokenizer;

ParameterParser(String string) {
tokenizer = new StringTokenizer(string, "&;=");
}

BatchInfo nextBatch() {
if (!tokenizer.hasMoreTokens()) {
return null;
}

String s = tokenizer.nextToken();
String[] elements = s.split("-");
assert (elements.length == 2);
assert (elements[0].equalsIgnoreCase("batch"));
String batchId = elements[1];

if (!tokenizer.hasMoreTokens()) {
throw new RuntimeException("Batch ack for batch " + batchId
+ " doesn't have a status.");
}

String status = tokenizer.nextToken();

if (status.equalsIgnoreCase(WebConstants.ACK_BATCH_OK)) {
return new BatchInfo(batchId);
} else {
int line = Integer.parseInt(status);
return new BatchInfo(batchId, line);
if (success) {
logger.info("Push completed");
}
}
}

private void pushToNode(Node remote) {
private boolean pushToNode(Node remote) {
IOutgoingWithResponseTransport transport = null;
boolean success = false;
try {
IOutgoingWithResponseTransport transport = transportManager
.getPushTransport(remote, nodeService.findIdentity());
transport = transportManager.getPushTransport(remote, nodeService.findIdentity());

try {
if (extractor.extract(remote, transport)) {
logger.info("Push data sent");

if (!extractor.extract(remote, transport)) {
return;
}
BufferedReader reader = transport.readResponse();
String ackString = reader.readLine();
String ackExtendedString = reader.readLine();

logger.debug("Just pushed data, about to read the response.");
if (logger.isDebugEnabled()) {
logger.debug("Reading ack: " + ackString);
logger.debug("Reading extended ack: " + ackExtendedString);
}

BufferedReader reader = transport.readResponse();
ParameterParser parser = new ParameterParser(reader.readLine());
List<BatchInfo> batches = transportManager.readAcknowledgement(ackString, ackExtendedString);

List<BatchInfo> batches = new ArrayList<BatchInfo>();
BatchInfo batchInfo = parser.nextBatch();
while (batchInfo != null) {
for (BatchInfo batchInfo : batches) {
if (logger.isDebugEnabled()) {
logger.debug("Ack -- Batch: " + batchInfo.getBatchId()
+ " outcome: "
logger.debug("Saving ack: " + batchInfo.getBatchId() + ", "
+ (batchInfo.isOk() ? "OK" : "error"));
}
batches.add(batchInfo);
batchInfo = parser.nextBatch();
ackService.ack(batchInfo);
}

for (BatchInfo batch : batches) {
ackService.ack(batch);
}

} finally {
transport.close();
}
success = true;
} catch (ConnectException ex) {
logger.warn(ErrorConstants.COULD_NOT_CONNECT_TO_TRANSPORT + " url=" + remote.getSyncURL());
} catch (ConnectionRejectedException ex) {
Expand All @@ -144,12 +104,18 @@ private void pushToNode(Node remote) {
} catch (TransportException ex) {
logger.warn(ex.getMessage());
} catch (AuthenticationException ex) {
logger.warn(ErrorConstants.NOT_AUTHENTICATED);
logger.warn(ErrorConstants.NOT_AUTHENTICATED);
} catch (Exception e) {
// just report the error because we want to push to other nodes
// in our list
logger.error(e, e);
} finally {
try {
transport.close();
} catch (Exception e) {
}
}
return success;
}

public void setExtractor(IDataExtractorService extractor) {
Expand Down
Expand Up @@ -21,9 +21,15 @@
package org.jumpmind.symmetric.transport;

import java.io.IOException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.math.NumberUtils;
import org.jumpmind.symmetric.model.BatchInfo;
import org.jumpmind.symmetric.model.IncomingBatchHistory;
import org.jumpmind.symmetric.model.IncomingBatchHistory.Status;
import org.jumpmind.symmetric.web.WebConstants;
Expand All @@ -43,13 +49,13 @@ protected String getAcknowledgementData(List<IncomingBatchHistory> list) throws
}
append(builder, WebConstants.ACK_BATCH_NAME + status.getBatchId(), value);
}
return builder.toString();
}

protected String getExtendedAcknowledgementData(List<IncomingBatchHistory> list) throws IOException {
StringBuilder builder = new StringBuilder();
// For backwards compatibility with 1.3 and earlier, the first line is the
// original acknowledgement data and the second line contains more information
builder.append("\n");
for (IncomingBatchHistory status : list) {
String batchId = status.getBatchId();
append(builder, WebConstants.ACK_NODE_ID + batchId, status.getNodeId());
append(builder, WebConstants.ACK_NETWORK_MILLIS + batchId, status.getNetworkMillis());
append(builder, WebConstants.ACK_FILTER_MILLIS + batchId, status.getFilterMillis());
append(builder, WebConstants.ACK_DATABASE_MILLIS + batchId, status.getDatabaseMillis());
Expand All @@ -75,4 +81,77 @@ protected void append(StringBuilder builder, String name, Object value) throws I
builder.append(name).append("=").append(URLEncoder.encode(value.toString(), ENCODING));
}

public List<BatchInfo> readAcknowledgement(String parameterString1, String parameterString2)
throws IOException {
return readAcknowledgement(parameterString1 + "&" + parameterString2);
}

public List<BatchInfo> readAcknowledgement(String parameterString) throws IOException {
Map<String, Object> parameters = getParametersFromQueryUrl(parameterString.replace("\n", ""));
return readAcknowledgement(parameters);
}

public List<BatchInfo> readAcknowledgement(Map<String, Object> parameters) {
List<BatchInfo> batches = new ArrayList<BatchInfo>();
for (String parameterName : parameters.keySet()) {
if (parameterName.startsWith(WebConstants.ACK_BATCH_NAME)) {
String batchId = parameterName.substring(WebConstants.ACK_BATCH_NAME.length());
BatchInfo batchInfo = getBatchInfo(parameters, batchId);
batches.add(batchInfo);
}
}
return batches;
}

private BatchInfo getBatchInfo(Map<String, Object> parameters, String batchId) {
BatchInfo batchInfo = new BatchInfo(batchId);
batchInfo.setNodeId(getParam(parameters, WebConstants.ACK_NODE_ID + batchId));
batchInfo.setNetworkMillis(getParamAsNum(parameters, WebConstants.ACK_NETWORK_MILLIS + batchId));
batchInfo.setFilterMillis(getParamAsNum(parameters, WebConstants.ACK_FILTER_MILLIS + batchId));
batchInfo.setDatabaseMillis(getParamAsNum(parameters, WebConstants.ACK_DATABASE_MILLIS + batchId));
batchInfo.setByteCount(getParamAsNum(parameters, WebConstants.ACK_BYTE_COUNT + batchId));
String status = getParam(parameters, WebConstants.ACK_BATCH_NAME + batchId, "");
batchInfo.setOk(status.equalsIgnoreCase(WebConstants.ACK_BATCH_OK));

if (!batchInfo.isOk()) {
batchInfo.setErrorLine(NumberUtils.toLong(status));
batchInfo.setSqlState(getParam(parameters, WebConstants.ACK_SQL_STATE + batchId));
batchInfo.setSqlCode((int) getParamAsNum(parameters, WebConstants.ACK_SQL_CODE + batchId));
batchInfo.setSqlMessage(getParam(parameters, WebConstants.ACK_SQL_MESSAGE + batchId));
}
return batchInfo;
}

protected Map<String, Object> getParametersFromQueryUrl(String parameterString) throws IOException {
Map<String, Object> parameters = new HashMap<String, Object>();
String[] tokens = parameterString.split("&");
for (String param : tokens) {
String[] nameValuePair = param.split("=");
if (nameValuePair.length == 2) {
parameters.put(nameValuePair[0], URLDecoder.decode(nameValuePair[1], ENCODING));
}
}
return parameters;
}

private long getParamAsNum(Map<String, Object> parameters, String parameterName) {
return NumberUtils.toLong(getParam(parameters, parameterName));
}

private String getParam(Map<String, Object> parameters, String parameterName, String defaultValue) {
String value = getParam(parameters, parameterName);
return value == null ? defaultValue : value;
}

private String getParam(Map<String, Object> parameters, String parameterName) {
Object value = parameters.get(parameterName);
if (value instanceof String[]) {
String[] arrayValue = (String[]) value;
if (arrayValue.length > 0) {
value = arrayValue[0];
}
}
return (String) value;
}

}
Expand Up @@ -23,20 +23,30 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;

import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.BatchInfo;
import org.jumpmind.symmetric.model.IncomingBatchHistory;
import org.jumpmind.symmetric.model.Node;

public interface ITransportManager {

public boolean sendAcknowledgement(Node remote, List<IncomingBatchHistory> list, Node local) throws IOException;
public boolean sendAcknowledgement(Node remote, List<IncomingBatchHistory> list, Node local)
throws IOException;

public void writeAcknowledgement(OutputStream out, List<IncomingBatchHistory> list) throws IOException;

public List<BatchInfo> readAcknowledgement(String parameterString1, String parameterString2)
throws IOException;

public List<BatchInfo> readAcknowledgement(String parameterString) throws IOException;

public List<BatchInfo> readAcknowledgement(Map<String, Object> parameters);

public IIncomingTransport getPullTransport(Node remote, Node local) throws IOException;

public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local) throws IOException;

public IIncomingTransport getRegisterTransport(Node node) throws IOException;

}
Expand Up @@ -70,14 +70,14 @@ public HttpTransportManager(IRuntimeConfig config, INodeService nodeService, int

public boolean sendAcknowledgement(Node remote, List<IncomingBatchHistory> list, Node local) throws IOException {
if (list != null && list.size() > 0) {
String data = getAcknowledgementData(list) + "\n" + getExtendedAcknowledgementData(list);
String data = getAcknowledgementData(list);
return sendMessage("ack", remote, local, data);
}
return true;
}

public void writeAcknowledgement(OutputStream out, List<IncomingBatchHistory> list) throws IOException {
writeMessage(out, getAcknowledgementData(list) + "\n" + getExtendedAcknowledgementData(list));
writeMessage(out, getAcknowledgementData(list));
}

public boolean sendMessage(String action, Node remote, Node local, String data) throws IOException {
Expand Down

0 comments on commit dedabfe

Please sign in to comment.