Skip to content

Commit

Permalink
0001141: Prevent invalid warning that expected ack was not received
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Mar 26, 2013
1 parent 3e7b6a5 commit 068d883
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 113 deletions.
Expand Up @@ -47,7 +47,7 @@ public IDataWriter getDataWriter(final String sourceNodeId,
protected void beforeResolutionAttempt(Conflict conflict) {
if (conflict.getPingBack() != PingBack.OFF) {
DatabaseWriter writer = (DatabaseWriter) transformWriter
.getTargetWriter();
.getNestedWriter();
ISqlTransaction transaction = writer.getTransaction();
if (transaction != null) {
symmetricDialect.enableSyncTriggers(transaction);
Expand All @@ -59,7 +59,7 @@ protected void beforeResolutionAttempt(Conflict conflict) {
protected void afterResolutionAttempt(Conflict conflict) {
if (conflict.getPingBack() == PingBack.SINGLE_ROW) {
DatabaseWriter writer = (DatabaseWriter) transformWriter
.getTargetWriter();
.getNestedWriter();
ISqlTransaction transaction = writer.getTransaction();
if (transaction != null) {
symmetricDialect.disableSyncTriggers(transaction, sourceNodeId);
Expand Down
Expand Up @@ -30,7 +30,7 @@ public class ProcessInfo implements Serializable, Comparable<ProcessInfo> {
private static final long serialVersionUID = 1L;

public static enum Status {
NEW, EXTRACTING, LOADING, TRANSFERRING, DONE, ERROR
NEW, EXTRACTING, LOADING, TRANSFERRING, ACKING, DONE, ERROR
};

private ProcessInfoKey key;
Expand Down Expand Up @@ -91,6 +91,9 @@ public Status getStatus() {
public void setStatus(Status status) {
this.status = status;
this.lastStatusChangeTime = new Date();
if (status == Status.DONE || status == Status.ERROR) {
this.endTime = new Date();
}
}

public long getDataCount() {
Expand Down
Expand Up @@ -20,69 +20,49 @@
*/
package org.jumpmind.symmetric.model;

import java.util.Map;

import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.util.Statistics;

public class ProcessInfoDataWriter implements IDataWriter {
import org.jumpmind.symmetric.io.data.writer.NestedDataWriter;

private IDataWriter targetWriter;
public class ProcessInfoDataWriter extends NestedDataWriter {

private ProcessInfo processInfo;

public ProcessInfoDataWriter(IDataWriter targetWriter, ProcessInfo processInfo) {
this.targetWriter = targetWriter;
super(targetWriter);
this.processInfo = processInfo;
}

public void open(DataContext context) {
targetWriter.open(context);
super.open(context);
processInfo.setDataCount(0);
processInfo.setBatchCount(0);
}

public void close() {
targetWriter.close();
}

public Map<Batch, Statistics> getStatistics() {
return targetWriter.getStatistics();
}

public void start(Batch batch) {
if (batch != null) {
processInfo.setCurrentBatchId(batch.getBatchId());
processInfo.setCurrentChannelId(batch.getChannelId());
processInfo.incrementBatchCount();
}
targetWriter.start(batch);
super.start(batch);
}

public boolean start(Table table) {
if (table != null) {
processInfo.setCurrentTableName(table.getFullyQualifiedTableName());
}
return targetWriter.start(table);
return super.start(table);
}

public void write(CsvData data) {
if (data != null) {
processInfo.incrementDataCount();
}
targetWriter.write(data);
}

public void end(Table table) {
targetWriter.end(table);
}

public void end(Batch batch, boolean inError) {
targetWriter.end(batch, inError);
super.write(data);
}

}
Expand Up @@ -514,7 +514,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, sourceNode);
new DataProcessor(dataReader, transformExtractWriter).process(ctx);
extractTimeInMs = System.currentTimeMillis() - ts;
Statistics stats = transformExtractWriter.getTargetWriter()
Statistics stats = transformExtractWriter.getNestedWriter()
.getStatistics().values().iterator().next();
byteCount = stats.get(DataWriterStatisticConstants.BYTECOUNT);
}
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ErrorConstants;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand Down Expand Up @@ -85,11 +86,15 @@
import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.IncomingBatch.Status;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.IncomingError;
import org.jumpmind.symmetric.model.LoadFilter;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoDataWriter;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IDataLoaderService;
Expand Down Expand Up @@ -198,9 +203,24 @@ public List<String> getAvailableDataLoaderFactories() {
}

public List<IncomingBatch> loadDataBatch(String batchData) throws IOException {
InternalIncomingTransport transport = new InternalIncomingTransport(new BufferedReader(
new StringReader(batchData)));
return loadDataFromTransport(nodeService.findIdentity(), transport);
String nodeId = nodeService.findIdentityNodeId();
if (StringUtils.isNotBlank(nodeId)) {
ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(nodeId,
nodeId, ProcessInfoKey.ProcessType.MANUAL_LOAD));
try {
InternalIncomingTransport transport = new InternalIncomingTransport(
new BufferedReader(new StringReader(batchData)));
List<IncomingBatch> list = loadDataFromTransport(processInfo,
nodeService.findIdentity(), transport);
processInfo.setStatus(ProcessInfo.Status.DONE);
return list;
} catch (RuntimeException ex) {
processInfo.setStatus(ProcessInfo.Status.ERROR);
throw ex;
}
} else {
return new ArrayList<IncomingBatch>(0);
}
}

/**
Expand Down Expand Up @@ -240,25 +260,35 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
remote.setSyncUrl(parameterService.getRegistrationUrl());
}

List<IncomingBatch> list = loadDataFromTransport(remote, transport);
if (list.size() > 0) {
status.updateIncomingStatus(list);
local = nodeService.findIdentity();
if (local != null) {
localSecurity = nodeService.findNodeSecurity(local.getNodeId());
if (StringUtils.isNotBlank(transport.getRedirectionUrl())) {
/*
* We were redirected for the pull, we need to redirect
* for the ack
*/
String url = transport.getRedirectionUrl();
url = url.replace(HttpTransportManager.buildRegistrationUrl("", local), "");
remote.setSyncUrl(url);
ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(remote.getNodeId(), local.getNodeId(), ProcessType.PULL_JOB));
try {
List<IncomingBatch> list = loadDataFromTransport(processInfo, remote, transport);
if (list.size() > 0) {
processInfo.setStatus(ProcessInfo.Status.ACKING);
status.updateIncomingStatus(list);
local = nodeService.findIdentity();
if (local != null) {
localSecurity = nodeService.findNodeSecurity(local.getNodeId());
if (StringUtils.isNotBlank(transport.getRedirectionUrl())) {
/*
* We were redirected for the pull, we need to
* redirect for the ack
*/
String url = transport.getRedirectionUrl();
url = url.replace(HttpTransportManager.buildRegistrationUrl("", local),
"");
remote.setSyncUrl(url);
}
sendAck(remote, local, localSecurity, list);
}
sendAck(remote, local, localSecurity, list);
}
processInfo.setStatus(ProcessInfo.Status.DONE);
} catch (RuntimeException e) {
processInfo.setStatus(ProcessInfo.Status.ERROR);
throw e;
}


} catch (RegistrationRequiredException e) {
if (StringUtils.isBlank(remote.getSyncUrl()) || remote.getSyncUrl().equals(parameterService.getRegistrationUrl())) {
log.warn("Node information missing on the server. Attempting to re-register");
Expand Down Expand Up @@ -287,12 +317,25 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
*/
public void loadDataFromPush(Node sourceNode, InputStream in, OutputStream out)
throws IOException {
List<IncomingBatch> list = loadDataFromTransport(sourceNode, new InternalIncomingTransport(
in));
Node local = nodeService.findIdentity();
NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId());
transportManager.writeAcknowledgement(out, sourceNode, list, local,
security != null ? security.getNodePassword() : null);
if (local != null) {
ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(sourceNode
.getNodeId(), local.getNodeId(), ProcessInfoKey.ProcessType.PUSH_HANDLER));
try {
List<IncomingBatch> list = loadDataFromTransport(processInfo, sourceNode,
new InternalIncomingTransport(in));
NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId());
processInfo.setStatus(ProcessInfo.Status.ACKING);
transportManager.writeAcknowledgement(out, sourceNode, list, local,
security != null ? security.getNodePassword() : null);
processInfo.setStatus(ProcessInfo.Status.DONE);
} catch (RuntimeException e) {
processInfo.setStatus(ProcessInfo.Status.ERROR);
throw e;
}
} else {
throw new SymmetricException("Could not load data because the node is not registered");
}
}

public void addDatabaseWriterFilter(IDatabaseWriterFilter filter) {
Expand Down Expand Up @@ -359,7 +402,7 @@ protected void sendAck(Node remote, Node local, NodeSecurity localSecurity,
* is used for a pull request that responds with data, and the
* acknowledgment is sent later.
*/
protected List<IncomingBatch> loadDataFromTransport(final Node sourceNode,
protected List<IncomingBatch> loadDataFromTransport(final ProcessInfo processInfo, final Node sourceNode,
IIncomingTransport transport) throws IOException {
final ManageIncomingBatchListener listener = new ManageIncomingBatchListener();
final DataContext ctx = new DataContext();
Expand All @@ -375,19 +418,20 @@ protected List<IncomingBatch> loadDataFromTransport(final Node sourceNode,
long totalNetworkMillis = System.currentTimeMillis();
String targetNodeId = nodeService.findIdentityNodeId();
if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) {
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
IDataReader dataReader = new ProtocolDataReader(BatchType.LOAD, targetNodeId,
transport.open());
IDataWriter dataWriter = new StagingDataWriter(sourceNode.getNodeId(),
Constants.STAGING_CATEGORY_INCOMING, stagingManager,
new LoadIntoDatabaseOnArrivalListener(sourceNode.getNodeId(), listener));
new LoadIntoDatabaseOnArrivalListener(processInfo, sourceNode.getNodeId(), listener));
new DataProcessor(dataReader, dataWriter).process(ctx);
totalNetworkMillis = System.currentTimeMillis() - totalNetworkMillis;
} else {
DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD,
targetNodeId, transport.open()), null, listener) {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
return buildDataWriter(sourceNode.getNodeId(), batch.getChannelId(),
return buildDataWriter(processInfo, sourceNode.getNodeId(), batch.getChannelId(),
batch.getBatchId());
}
};
Expand Down Expand Up @@ -448,7 +492,7 @@ protected void logAndRethrow(Node remoteNode, Throwable ex) throws IOException {
}
}

protected IDataWriter buildDataWriter(String sourceNodeId, String channelId, long batchId) {
protected IDataWriter buildDataWriter(ProcessInfo processInfo, String sourceNodeId, String channelId, long batchId) {
TransformTable[] transforms = null;
NodeGroupLink link = null;
List<ResolvedData> resolvedDatas = new ArrayList<ResolvedData>();
Expand Down Expand Up @@ -498,7 +542,7 @@ protected IDataWriter buildDataWriter(String sourceNodeId, String channelId, lon
IDataWriter targetWriter = getFactory(channelId).getDataWriter(sourceNodeId,
symmetricDialect, transformWriter, dynamicFilters, dynamicErrorHandlers,
getConflictSettingsNodeGroupLinks(link, false), resolvedDatas);
transformWriter.setTargetWriter(targetWriter);
transformWriter.setNestedWriter(new ProcessInfoDataWriter(targetWriter, processInfo));
return transformWriter;
}

Expand Down Expand Up @@ -717,15 +761,19 @@ class LoadIntoDatabaseOnArrivalListener implements IProtocolDataWriterListener {
private long batchStartsToArriveTimeInMs;

private String sourceNodeId;

private ProcessInfo processInfo;

public LoadIntoDatabaseOnArrivalListener(String sourceNodeId,
public LoadIntoDatabaseOnArrivalListener(ProcessInfo processInfo, String sourceNodeId,
ManageIncomingBatchListener listener) {
this.sourceNodeId = sourceNodeId;
this.listener = listener;
this.processInfo = processInfo;
}

public void start(DataContext ctx, Batch batch) {
batchStartsToArriveTimeInMs = System.currentTimeMillis();
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
}

public void end(DataContext ctx, Batch batch, IStagedResource resource) {
Expand All @@ -738,11 +786,13 @@ public void end(DataContext ctx, Batch batch, IStagedResource resource) {
}

try {

processInfo.setStatus(ProcessInfo.Status.LOADING);
DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD,
batch.getTargetNodeId(), resource), null, listener) {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
return buildDataWriter(sourceNodeId, batch.getChannelId(),
return buildDataWriter(processInfo, sourceNodeId, batch.getChannelId(),
batch.getBatchId());
}
};
Expand Down
Expand Up @@ -94,7 +94,7 @@ protected void init() {
public ProcessInfo newProcessInfo(ProcessInfoKey key) {
ProcessInfo process = new ProcessInfo(key);
ProcessInfo old = processInfos.get(key);
if (old != null && old.getStatus() != Status.DONE) {
if (old != null && (old.getStatus() != Status.DONE && old.getStatus() != Status.ERROR)) {
log.warn("Starting a new process even though the previous one ({}) had not finished", old.toString());
}
processInfos.put(key, process);
Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.io.data.writer.NestedDataWriter;
import org.jumpmind.util.Context;

public class DataContext extends Context {
Expand Down Expand Up @@ -120,10 +120,10 @@ public void setLastParsedTable(Table lastParsedTable) {

public ISqlTransaction findTransaction() {
ISqlTransaction transaction = null;
if (writer instanceof TransformWriter) {
IDataWriter targetWriter = ((TransformWriter) writer).getTargetWriter();
if (targetWriter instanceof DatabaseWriter) {
transaction = ((DatabaseWriter) targetWriter).getTransaction();
if (writer instanceof NestedDataWriter) {
DatabaseWriter dbWriter = ((NestedDataWriter)writer).getNestedWriterOfType(DatabaseWriter.class);
if (dbWriter != null) {
transaction = dbWriter.getTransaction();
}
} else if (writer instanceof DatabaseWriter) {
transaction = ((DatabaseWriter) writer).getTransaction();
Expand Down

0 comments on commit 068d883

Please sign in to comment.